From f4cc5541e4570194e8abdc043964aafdb01f9975 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Thu, 8 Dec 2022 17:18:45 +0000 Subject: [PATCH] Fix getting event logs by topic --- chain/events/filter/index.go | 64 +++++++++++++++----- chain/types/event.go | 6 ++ itests/eth_filter_test.go | 110 +++++++++++++++++++++++++++++------ itests/fevm_events_test.go | 8 ++- 4 files changed, 150 insertions(+), 38 deletions(-) diff --git a/chain/events/filter/index.go b/chain/events/filter/index.go index c5d0b62effa..55b0639a2ed 100644 --- a/chain/events/filter/index.go +++ b/chain/events/filter/index.go @@ -1,6 +1,7 @@ package filter import ( + "bytes" "context" "database/sql" "errors" @@ -10,6 +11,7 @@ import ( "github.com/ipfs/go-cid" _ "github.com/mattn/go-sqlite3" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -151,6 +153,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return xerrors.Errorf("prepare insert entry: %w", err) } + isIndexedValue := func(b uint8) bool { + return b&types.EventFlagIndexedValue == types.EventFlagIndexedValue + } + for msgIdx, em := range ems { for evIdx, ev := range em.Events() { addr, found := addressLookups[ev.Emitter] @@ -189,12 +195,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever } for _, entry := range ev.Entries { + value := decodeLogBytes(entry.Value) _, err := stmtEntry.Exec( - lastID, // event_id - entry.Flags&indexed == indexed, // indexed - []byte{entry.Flags}, // flags - entry.Key, // key - entry.Value, // value + lastID, // event_id + isIndexedValue(entry.Flags), // indexed + []byte{entry.Flags}, // flags + entry.Key, // key + value, // value ) if err != nil { return xerrors.Errorf("exec insert entry: %w", err) @@ -210,6 +217,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever return nil } +// decodeLogBytes decodes a CBOR-serialized array into its original form. +// +// This function swallows errors and returns the original array if it failed +// to decode. +func decodeLogBytes(orig []byte) []byte { + if orig == nil { + return orig + } + decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig))) + if err != nil { + return orig + } + return decoded +} + // PrefillFilter fills a filter's collection of events from the historic index func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { clauses := []string{} @@ -242,18 +264,19 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { if len(f.keys) > 0 { join := 0 for key, vals := range f.keys { - join++ - joinAlias := fmt.Sprintf("ee%d", join) - joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) - clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) - values = append(values, key) - subclauses := []string{} - for _, val := range vals { - subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias)) - values = append(values, val) + if len(vals) > 0 { + join++ + joinAlias := fmt.Sprintf("ee%d", join) + joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias)) + clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias)) + values = append(values, key) + subclauses := []string{} + for _, val := range vals { + subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias)) + values = append(values, trimLeadingZeros(val)) + } + clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") } - clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")") - } } @@ -397,3 +420,12 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error { return nil } + +func trimLeadingZeros(b []byte) []byte { + for i := range b { + if b[i] != 0 { + return b[i:] + } + } + return []byte{} +} diff --git a/chain/types/event.go b/chain/types/event.go index c67047d14a2..00c25ca4cb6 100644 --- a/chain/types/event.go +++ b/chain/types/event.go @@ -24,3 +24,9 @@ type EventEntry struct { } type FilterID [32]byte // compatible with EthHash + +// EventEntry flags defined in fvm_shared +const ( + EventFlagIndexedKey = 0b00000001 + EventFlagIndexedValue = 0b00000010 +) diff --git a/itests/eth_filter_test.go b/itests/eth_filter_test.go index 8c900bbf148..edb7c6a87fe 100644 --- a/itests/eth_filter_test.go +++ b/itests/eth_filter_test.go @@ -4,6 +4,7 @@ package itests import ( "context" "encoding/hex" + "encoding/json" "os" "path/filepath" "strconv" @@ -446,16 +447,14 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) { return el, err } -func TestEthGetLogsAll(t *testing.T) { - require := require.New(t) - - kit.QuietMiningLogs() - - blockTime := 100 * time.Millisecond - dbpath := filepath.Join(t.TempDir(), "actorevents.db") +type msgInTipset struct { + msg api.Message + ts *types.TipSet + reverted bool +} - client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath)) - ens.InterconnectAll().BeginMining(blockTime) +func invokeContractAndWaitUntilAllOnChain(t *testing.T, client *kit.TestFullNode, iterations int) (api.EthAddress, map[api.EthHash]msgInTipset) { + require := require.New(t) ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() @@ -476,13 +475,6 @@ func TestEthGetLogsAll(t *testing.T) { require.NoError(err) t.Logf("actor ID address is %s", idAddr) - const iterations = 10 - - type msgInTipset struct { - msg api.Message - ts *types.TipSet - } - msgChan := make(chan msgInTipset, iterations) waitAllCh := make(chan struct{}) @@ -503,7 +495,7 @@ func TestEthGetLogsAll(t *testing.T) { count += len(msgs) for _, m := range msgs { select { - case msgChan <- msgInTipset{msg: m, ts: change.Val}: + case msgChan <- msgInTipset{msg: m, ts: change.Val, reverted: change.Type == store.HCRevert}: default: } } @@ -550,6 +542,22 @@ func TestEthGetLogsAll(t *testing.T) { ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address) require.NoError(err) + return ethContractAddr, received +} + +func TestEthGetLogsAll(t *testing.T) { + require := require.New(t) + + kit.QuietMiningLogs() + + blockTime := 100 * time.Millisecond + dbpath := filepath.Join(t.TempDir(), "actorevents.db") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath)) + ens.InterconnectAll().BeginMining(blockTime) + + ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, 10) + topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11})) topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22})) topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33})) @@ -558,8 +566,8 @@ func TestEthGetLogsAll(t *testing.T) { pstring := func(s string) *string { return &s } - // get logs - res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{ + // get all logs + res, err := client.EthGetLogs(context.Background(), &api.EthFilterSpec{ FromBlock: pstring("0x0"), }) require.NoError(err) @@ -600,6 +608,70 @@ func TestEthGetLogsAll(t *testing.T) { } } +func TestEthGetLogsByTopic(t *testing.T) { + require := require.New(t) + + kit.QuietMiningLogs() + + blockTime := 100 * time.Millisecond + dbpath := filepath.Join(t.TempDir(), "actorevents.db") + + client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath)) + ens.InterconnectAll().BeginMining(blockTime) + + invocations := 1 + + ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, invocations) + + topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11})) + topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22})) + topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33})) + topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44})) + data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88})) + + // find log by known topic1 + var spec api.EthFilterSpec + err := json.Unmarshal([]byte(`{"fromBlock":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000001111"]}`), &spec) + require.NoError(err) + + res, err := client.EthGetLogs(context.Background(), &spec) + require.NoError(err) + + require.Equal(invocations, len(res.Results)) + + for _, r := range res.Results { + // since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results + rc, ok := r.(map[string]interface{}) + require.True(ok, "result type") + + elog, err := ParseEthLog(rc) + require.NoError(err) + + require.Equal(ethContractAddr, elog.Address, "event address") + require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset + + msg, exists := received[elog.TransactionHash] + require.True(exists, "message seen on chain") + + tsCid, err := msg.ts.Key().Cid() + require.NoError(err) + + tsCidHash, err := api.NewEthHashFromCid(tsCid) + require.NoError(err) + + require.Equal(tsCidHash, elog.BlockHash, "block hash") + + require.Equal(4, len(elog.Topics), "number of topics") + require.Equal(topic1, elog.Topics[0], "topic1") + require.Equal(topic2, elog.Topics[1], "topic2") + require.Equal(topic3, elog.Topics[2], "topic3") + require.Equal(topic4, elog.Topics[3], "topic4") + + require.Equal(data1, elog.Data, "data1") + + } +} + func TestEthSubscribeLogs(t *testing.T) { require := require.New(t) diff --git a/itests/fevm_events_test.go b/itests/fevm_events_test.go index 4a16dc38f23..30dd7015f78 100644 --- a/itests/fevm_events_test.go +++ b/itests/fevm_events_test.go @@ -29,6 +29,7 @@ func TestFEVMEvents(t *testing.T) { defer cancel() // install contract + // See https://github.com/filecoin-project/builtin-actors/blob/next/actors/evm/tests/events.rs#L12 contractHex, err := os.ReadFile("contracts/events.bin") require.NoError(err) @@ -65,17 +66,18 @@ func TestFEVMEvents(t *testing.T) { ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x00}, nil) require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") require.NotNil(ret.Receipt.EventsRoot) - fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) + fmt.Println(ret) + fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) // log a zero topic event with no data ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x01}, nil) require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") fmt.Println(ret) - fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) + fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) // log a four topic event with data ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil) require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed") fmt.Println(ret) - fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) + fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot)) }