From 60c528adacfab56000a1d7b01d18ad0bf24c9a23 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Tue, 9 Apr 2024 11:16:19 +0400 Subject: [PATCH] fix off by one --- itests/eth_filter_test.go | 35 +++++++++++++++++++++++++++++++++++ node/impl/full/eth_events.go | 25 +++++++++++++++++++++---- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 9212e60fc01..d77a0ce148b 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -137,6 +137,41 @@ func TestEthNewPendingTransactionFilter(t *testing.T) { } } +func TestEthNewHeadsSubSimple(t *testing.T) { + require := require.New(t) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + kit.QuietAllLogsExcept("events", "messagepool") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.WithEthRPC()) + ens.InterconnectAll().BeginMining(10 * time.Millisecond) + + // install filter + subId, err := client.EthSubscribe(ctx, res.Wrap[jsonrpc.RawParams](json.Marshal(ethtypes.EthSubscribeParams{EventType: "newHeads"})).Assert(require.NoError)) + require.NoError(err) + + err = client.EthSubRouter.AddSub(ctx, subId, func(ctx context.Context, resp *ethtypes.EthSubscriptionResponse) error { + rs := *resp + block, ok := rs.Result.(map[string]interface{}) + require.True(ok) + blockNumber, ok := block["number"].(string) + require.True(ok) + + blk, err := client.EthGetBlockByNumber(ctx, blockNumber, false) + require.NoError(err) + require.NotNil(blk) + fmt.Printf("block: %v\n", blk) + // block hashes should match + require.Equal(block["hash"], blk.Hash.String()) + + return nil + }) + require.NoError(err) + time.Sleep(2 * time.Second) +} + func TestEthNewPendingTransactionSub(t *testing.T) { require := require.New(t) diff --git a/node/impl/full/eth_events.go b/node/impl/full/eth_events.go index 063590d8dd2..7baba1e81e1 100644 --- a/node/impl/full/eth_events.go +++ b/node/impl/full/eth_events.go @@ -254,6 +254,8 @@ type ethSubscription struct { sendQueueLen int toSend *queue.Queue[[]byte] sendCond chan struct{} + + lastSentTipset *types.TipSetKey } func (e *ethSubscription) addFilter(ctx context.Context, f filter.Filter) { @@ -341,12 +343,27 @@ func (e *ethSubscription) start(ctx context.Context) { e.send(ctx, r) } case *types.TipSet: - ev, err := newEthBlockFromFilecoinTipSet(ctx, vt, true, e.Chain, e.StateAPI) - if err != nil { - break + // Skip processing for tipset at epoch 0 as it has no parent + if vt.Height() == 0 { + continue + } + // Check if the parent has already been processed + parentTipSetKey := vt.Parents() + if e.lastSentTipset != nil && (*e.lastSentTipset) == parentTipSetKey { + continue + } + parentTipSet, loadErr := e.Chain.LoadTipSet(ctx, parentTipSetKey) + if loadErr != nil { + log.Warnw("failed to load parent tipset", "tipset", parentTipSetKey, "error", loadErr) + continue + } + ethBlock, ethBlockErr := newEthBlockFromFilecoinTipSet(ctx, parentTipSet, true, e.Chain, e.StateAPI) + if ethBlockErr != nil { + continue } - e.send(ctx, ev) + e.send(ctx, ethBlock) + e.lastSentTipset = &parentTipSetKey case *types.SignedMessage: // mpool txid evs, err := ethFilterResultFromMessages([]*types.SignedMessage{vt}) if err != nil {