Skip to content

Commit

Permalink
fix: event filters use cbor encoding internally (#10085)
Browse files Browse the repository at this point in the history
  • Loading branch information
iand authored Jan 23, 2023
1 parent c5ed5dd commit ddd5ff9
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 64 deletions.
23 changes: 5 additions & 18 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,18 @@ func (f *EventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
continue
}

decodedEntries := make([]types.EventEntry, len(ev.Entries))
entries := make([]types.EventEntry, len(ev.Entries))
for i, entry := range ev.Entries {
decodedEntries[i] = types.EventEntry{
entries[i] = types.EventEntry{
Flags: entry.Flags,
Key: entry.Key,
Value: decodeLogBytes(entry.Value),
Value: entry.Value,
}
}

// event matches filter, so record it
cev := &CollectedEvent{
Entries: decodedEntries,
Entries: entries,
EmitterAddr: addr,
EventIdx: evIdx,
Reverted: revert,
Expand Down Expand Up @@ -231,10 +231,7 @@ func (f *EventFilter) matchKeys(ees []types.EventEntry) bool {
}

for _, w := range wantlist {
// TODO: remove this. Currently the filters use raw values but the value in the entry is cbor-encoded
// We want to make all internal values cbor-encoded as per https://github.com/filecoin-project/ref-fvm/issues/1345
value := leftpad32(decodeLogBytes(ee.Value))
if bytes.Equal(w, value) {
if bytes.Equal(w, ee.Value) {
matched[keyname] = true
break
}
Expand Down Expand Up @@ -489,13 +486,3 @@ func (m *EventFilterManager) loadExecutedMessages(ctx context.Context, msgTs, rc

return ems, nil
}

func leftpad32(orig []byte) []byte {
needed := 32 - len(orig)
if needed <= 0 {
return orig
}
ret := make([]byte, 32)
copy(ret[needed:], orig)
return ret
}
20 changes: 1 addition & 19 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package filter

import (
"bytes"
"context"
"database/sql"
"errors"
Expand All @@ -11,7 +10,6 @@ import (

"github.com/ipfs/go-cid"
_ "github.com/mattn/go-sqlite3"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -191,13 +189,12 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}

for _, entry := range ev.Entries {
value := decodeLogBytes(entry.Value)
_, err := stmtEntry.Exec(
lastID, // event_id
isIndexedValue(entry.Flags), // indexed
[]byte{entry.Flags}, // flags
entry.Key, // key
value, // value
entry.Value, // value
)
if err != nil {
return xerrors.Errorf("exec insert entry: %w", err)
Expand All @@ -213,21 +210,6 @@ func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

// decodeLogBytes decodes a CBOR-serialized array into its original form.
//
// This function swallows errors and returns the original array if it failed
// to decode.
func decodeLogBytes(orig []byte) []byte {
if len(orig) == 0 {
return orig
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return orig
}
return decoded
}

// PrefillFilter fills a filter's collection of events from the historic index
func (ei *EventIndex) PrefillFilter(ctx context.Context, f *EventFilter) error {
clauses := []string{}
Expand Down
88 changes: 61 additions & 27 deletions node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -980,17 +980,9 @@ func (e *EthEvent) installEthFilterSpec(ctx context.Context, filterSpec *ethtype
addresses = append(addresses, a)
}

for idx, vals := range filterSpec.Topics {
if len(vals) == 0 {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
for _, v := range vals {
buf := make([]byte, len(v[:]))
copy(buf, v[:])
keys[key] = append(keys[key], buf)
}
keys, err := parseEthTopics(filterSpec.Topics)
if err != nil {
return nil, err
}

return e.EventFilterManager.Install(ctx, minHeight, maxHeight, tipsetCid, addresses, keys)
Expand Down Expand Up @@ -1141,17 +1133,12 @@ func (e *EthEvent) EthSubscribe(ctx context.Context, eventType string, params *e
case EthSubscribeEventTypeLogs:
keys := map[string][][]byte{}
if params != nil {
for idx, vals := range params.Topics {
if len(vals) == 0 {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
for _, v := range vals {
buf := make([]byte, len(v[:]))
copy(buf, v[:])
keys[key] = append(keys[key], buf)
}
var err error
keys, err = parseEthTopics(params.Topics)
if err != nil {
// clean up any previous filters added and stop the sub
_, _ = e.EthUnsubscribe(ctx, sub.id)
return nil, err
}
}

Expand Down Expand Up @@ -1238,7 +1225,10 @@ func ethFilterResultFromEvents(evs []*filter.CollectedEvent, sa StateAPI) (*etht
var err error

for _, entry := range ev.Entries {
value := ethtypes.EthBytes(leftpad32(entry.Value)) // value has already been cbor-decoded but see https://github.com/filecoin-project/ref-fvm/issues/1345
value, err := cborDecodeTopicValue(entry.Value)
if err != nil {
return nil, err
}
if entry.Key == ethtypes.EthTopic1 || entry.Key == ethtypes.EthTopic2 || entry.Key == ethtypes.EthTopic3 || entry.Key == ethtypes.EthTopic4 {
log.Topics = append(log.Topics, value)
} else {
Expand Down Expand Up @@ -1892,10 +1882,6 @@ func EthTxHashGC(ctx context.Context, retentionDays int, manager *EthTxHashManag
}
}

// TODO we could also emit full EVM words from the EVM runtime, but not doing so
// makes the contract slightly cheaper (and saves storage bytes), at the expense
// of having to left pad in the API, which is a pretty acceptable tradeoff at
// face value. There may be other protocol implications to consider.
func leftpad32(orig []byte) []byte {
needed := 32 - len(orig)
if needed <= 0 {
Expand All @@ -1905,3 +1891,51 @@ func leftpad32(orig []byte) []byte {
copy(ret[needed:], orig)
return ret
}

func trimLeadingZeros(b []byte) []byte {
for i := range b {
if b[i] != 0 {
return b[i:]
}
}
return []byte{}
}

func cborEncodeTopicValue(orig []byte) ([]byte, error) {
var buf bytes.Buffer
err := cbg.WriteByteArray(&buf, trimLeadingZeros(orig))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func cborDecodeTopicValue(orig []byte) ([]byte, error) {
if len(orig) == 0 {
return orig, nil
}
decoded, err := cbg.ReadByteArray(bytes.NewReader(orig), uint64(len(orig)))
if err != nil {
return nil, err
}
return leftpad32(decoded), nil
}

func parseEthTopics(topics ethtypes.EthTopicSpec) (map[string][][]byte, error) {
keys := map[string][][]byte{}
for idx, vals := range topics {
if len(vals) == 0 {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
for _, v := range vals {
encodedVal, err := cborEncodeTopicValue(v[:])
if err != nil {
return nil, xerrors.Errorf("failed to encode topic value")
}
keys[key] = append(keys[key], encodedVal)
}
}
return keys, nil
}

0 comments on commit ddd5ff9

Please sign in to comment.