Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix getting event logs by topic #9819

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 48 additions & 16 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package filter

import (
"bytes"
"context"
"database/sql"
"errors"
Expand All @@ -10,6 +11,7 @@ import (

"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -151,6 +153,10 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return xerrors.Errorf("prepare insert entry: %w", err)
}

isIndexedValue := func(b uint8) bool {
return b&types.EventFlagIndexedValue == types.EventFlagIndexedValue
}

for msgIdx, em := range ems {
for evIdx, ev := range em.Events() {
addr, found := addressLookups[ev.Emitter]
Expand Down Expand Up @@ -189,12 +195,13 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}

for _, entry := range ev.Entries {
value := decodeLogBytes(entry.Value)
_, err := stmtEntry.Exec(
lastID, // event_id
entry.Flags&indexed == indexed, // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
entry.Value, // value
lastID, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
value, // value
)
if err != nil {
return xerrors.Errorf("exec insert entry: %w", err)
Expand All @@ -210,6 +217,21 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

// decodeLogBytes decodes a CBOR-serialized array into its original form.
//
// This function swallows errors and returns the original array if it failed
// to decode.
func decodeLogBytes(orig []byte) []byte {
if orig == nil {
return orig
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return orig
}
return decoded
}

// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{}
Expand Down Expand Up @@ -242,18 +264,19 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
if len(f.keys) > 0 {
join := 0
for key, vals := range f.keys {
join++
joinAlias := fmt.Sprintf("ee%d", join)
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
values = append(values, key)
subclauses := []string{}
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias))
values = append(values, val)
if len(vals) > 0 {
join++
joinAlias := fmt.Sprintf("ee%d", join)
joins = append(joins, fmt.Sprintf("event_entry %s on event.id=%[1]s.event_id", joinAlias))
clauses = append(clauses, fmt.Sprintf("%s.indexed=1 AND %[1]s.key=?", joinAlias))
values = append(values, key)
subclauses := []string{}
for _, val := range vals {
subclauses = append(subclauses, fmt.Sprintf("%s.value=?", joinAlias))
values = append(values, trimLeadingZeros(val))
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")
}
clauses = append(clauses, "("+strings.Join(subclauses, " OR ")+")")

}
}

Expand Down Expand Up @@ -397,3 +420,12 @@ func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {

return nil
}

func trimLeadingZeros(b []byte) []byte {
for i := range b {
if b[i] != 0 {
return b[i:]
}
}
return []byte{}
}
6 changes: 6 additions & 0 deletions chain/types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ type EventEntry struct {
}

type FilterID [32]byte // compatible with EthHash

// EventEntry flags defined in fvm_shared
const (
EventFlagIndexedKey = 0b00000001
EventFlagIndexedValue = 0b00000010
)
110 changes: 91 additions & 19 deletions itests/eth_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package itests
import (
"context"
"encoding/hex"
"encoding/json"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -446,16 +447,14 @@ func ParseEthLog(in map[string]interface{}) (*api.EthLog, error) {
return el, err
}

func TestEthGetLogsAll(t *testing.T) {
require := require.New(t)

kit.QuietMiningLogs()

blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")
type msgInTipset struct {
msg api.Message
ts *types.TipSet
reverted bool
}

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)
func invokeContractAndWaitUntilAllOnChain(t *testing.T, client *kit.TestFullNode, iterations int) (api.EthAddress, map[api.EthHash]msgInTipset) {
require := require.New(t)

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand All @@ -476,13 +475,6 @@ func TestEthGetLogsAll(t *testing.T) {
require.NoError(err)
t.Logf("actor ID address is %s", idAddr)

const iterations = 10

type msgInTipset struct {
msg api.Message
ts *types.TipSet
}

msgChan := make(chan msgInTipset, iterations)

waitAllCh := make(chan struct{})
Expand All @@ -503,7 +495,7 @@ func TestEthGetLogsAll(t *testing.T) {
count += len(msgs)
for _, m := range msgs {
select {
case msgChan <- msgInTipset{msg: m, ts: change.Val}:
case msgChan <- msgInTipset{msg: m, ts: change.Val, reverted: change.Type == store.HCRevert}:
default:
}
}
Expand Down Expand Up @@ -550,6 +542,22 @@ func TestEthGetLogsAll(t *testing.T) {
ethContractAddr, err := api.EthAddressFromFilecoinAddress(*actor.Address)
require.NoError(err)

return ethContractAddr, received
}

func TestEthGetLogsAll(t *testing.T) {
require := require.New(t)

kit.QuietMiningLogs()

blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)

ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, 10)

topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
Expand All @@ -558,8 +566,8 @@ func TestEthGetLogsAll(t *testing.T) {

pstring := func(s string) *string { return &s }

// get logs
res, err := client.EthGetLogs(ctx, &api.EthFilterSpec{
// get all logs
res, err := client.EthGetLogs(context.Background(), &api.EthFilterSpec{
FromBlock: pstring("0x0"),
})
require.NoError(err)
Expand Down Expand Up @@ -600,6 +608,70 @@ func TestEthGetLogsAll(t *testing.T) {
}
}

func TestEthGetLogsByTopic(t *testing.T) {
require := require.New(t)

kit.QuietMiningLogs()

blockTime := 100 * time.Millisecond
dbpath := filepath.Join(t.TempDir(), "actorevents.db")

client, _, ens := kit.EnsembleMinimal(t, kit.MockProofs(), kit.ThroughRPC(), kit.HistoricFilterAPI(dbpath))
ens.InterconnectAll().BeginMining(blockTime)

invocations := 1

ethContractAddr, received := invokeContractAndWaitUntilAllOnChain(t, client, invocations)

topic1 := api.EthBytes(leftpad32([]byte{0x11, 0x11}))
topic2 := api.EthBytes(leftpad32([]byte{0x22, 0x22}))
topic3 := api.EthBytes(leftpad32([]byte{0x33, 0x33}))
topic4 := api.EthBytes(leftpad32([]byte{0x44, 0x44}))
data1 := api.EthBytes(leftpad32([]byte{0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88}))

// find log by known topic1
var spec api.EthFilterSpec
err := json.Unmarshal([]byte(`{"fromBlock":"0x0","topics":["0x0000000000000000000000000000000000000000000000000000000000001111"]}`), &spec)
require.NoError(err)

res, err := client.EthGetLogs(context.Background(), &spec)
require.NoError(err)

require.Equal(invocations, len(res.Results))

for _, r := range res.Results {
// since response is a union and Go doesn't support them well, go-jsonrpc won't give us typed results
rc, ok := r.(map[string]interface{})
require.True(ok, "result type")

elog, err := ParseEthLog(rc)
require.NoError(err)

require.Equal(ethContractAddr, elog.Address, "event address")
require.Equal(api.EthUint64(0), elog.TransactionIndex, "transaction index") // only one message per tipset

msg, exists := received[elog.TransactionHash]
require.True(exists, "message seen on chain")

tsCid, err := msg.ts.Key().Cid()
require.NoError(err)

tsCidHash, err := api.NewEthHashFromCid(tsCid)
require.NoError(err)

require.Equal(tsCidHash, elog.BlockHash, "block hash")

require.Equal(4, len(elog.Topics), "number of topics")
require.Equal(topic1, elog.Topics[0], "topic1")
require.Equal(topic2, elog.Topics[1], "topic2")
require.Equal(topic3, elog.Topics[2], "topic3")
require.Equal(topic4, elog.Topics[3], "topic4")

require.Equal(data1, elog.Data, "data1")

}
}

func TestEthSubscribeLogs(t *testing.T) {
require := require.New(t)

Expand Down
8 changes: 5 additions & 3 deletions itests/fevm_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestFEVMEvents(t *testing.T) {
defer cancel()

// install contract
// See https://github.com/filecoin-project/builtin-actors/blob/next/actors/evm/tests/events.rs#L12
contractHex, err := os.ReadFile("contracts/events.bin")
require.NoError(err)

Expand Down Expand Up @@ -65,17 +66,18 @@ func TestFEVMEvents(t *testing.T) {
ret := client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x00}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
require.NotNil(ret.Receipt.EventsRoot)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
fmt.Println(ret)
fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))

// log a zero topic event with no data
ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x01}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
fmt.Println(ret)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))

// log a four topic event with data
ret = client.EVM().InvokeSolidity(ctx, fromAddr, idAddr, []byte{0x00, 0x00, 0x00, 0x02}, nil)
require.True(ret.Receipt.ExitCode.IsSuccess(), "contract execution failed")
fmt.Println(ret)
fmt.Println(client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
fmt.Printf("Events:\n %+v\n", client.EVM().LoadEvents(ctx, *ret.Receipt.EventsRoot))
}