Skip to content

Commit

Permalink
chore: Avoid ingesting binary and unused data (#241)
Browse files Browse the repository at this point in the history
* chore: remove ingestion for binary and unused data

- Remove the following tables:
  - public.drand_entries
- Remove the Following columns:
  - public.block_headers ticket
  - public.block_headers election_proof
  - public.messages params
  - public.receipts return

closes #204

* fix: Reorder migration to merge to master

Co-authored-by: Mike Greenberg <[email protected]>
  • Loading branch information
frrist and placer14 authored Nov 19, 2020
1 parent 6ec7b56 commit d588dea
Show file tree
Hide file tree
Showing 12 changed files with 54 additions and 104 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -595,6 +595,7 @@ github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
Expand Down
47 changes: 0 additions & 47 deletions model/blocks/drand.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,6 @@ import (
"golang.org/x/xerrors"
)

type DrandEntrie struct {
Round uint64 `pg:",pk,use_zero"`
Data []byte `pg:",notnull"`
}

func (de *DrandEntrie) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, de).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting drand entries: %w", err)
}
return nil
}

func NewDrandEnties(header *types.BlockHeader) DrandEntries {
var out DrandEntries
for _, ent := range header.BeaconEntries {
out = append(out, &DrandEntrie{
Round: ent.Round,
Data: ent.Data,
})
}
return out
}

type DrandEntries []*DrandEntrie

func (des DrandEntries) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
return des.PersistWithTx(ctx, tx)
})
}

func (des DrandEntries) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if len(des) == 0 {
return nil
}
ctx, span := global.Tracer("").Start(ctx, "DrandEntries.PersistWithTx", trace.WithAttributes(label.Int("count", len(des))))
defer span.End()
if _, err := tx.ModelContext(ctx, &des).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting drand entries: %w", err)
}
return nil
}

func NewDrandBlockEntries(header *types.BlockHeader) DrandBlockEntries {
var out DrandBlockEntries
for _, ent := range header.BeaconEntries {
Expand Down
5 changes: 0 additions & 5 deletions model/blocks/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type BlockHeader struct {
WinCount int64 `pg:",use_zero"`
Timestamp uint64 `pg:",use_zero"`
ForkSignaling uint64 `pg:",use_zero"`

Ticket []byte
ElectionProof []byte
}

func NewBlockHeader(bh *types.BlockHeader) *BlockHeader {
Expand All @@ -38,8 +35,6 @@ func NewBlockHeader(bh *types.BlockHeader) *BlockHeader {
WinCount: bh.ElectionProof.WinCount,
Timestamp: bh.Timestamp,
ForkSignaling: bh.ForkSignaling,
Ticket: bh.Ticket.VRFProof,
ElectionProof: bh.ElectionProof.VRFProof,
}
}

Expand Down
2 changes: 0 additions & 2 deletions model/messages/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ type Message struct {
SizeBytes int `pg:",use_zero"`
Nonce uint64 `pg:",use_zero"`
Method uint64 `pg:",use_zero"`

Params []byte
}

func (m *Message) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
Expand Down
2 changes: 0 additions & 2 deletions model/messages/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ type Receipt struct {
Idx int `pg:",use_zero"`
ExitCode int64 `pg:",use_zero"`
GasUsed int64 `pg:",use_zero"`

Return []byte
}

func (r *Receipt) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
Expand Down
47 changes: 47 additions & 0 deletions storage/migrations/20_avoid_binary_ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

// Schema version 20 removes unused binary data

func init() {
up := batch(`
DROP TABLE IF EXISTS public.drand_entries;
-- view depends on ticket and election_proof
DROP VIEW IF EXISTS chain_visualizer_blocks_view;
ALTER TABLE public.block_headers DROP COLUMN ticket;
ALTER TABLE public.block_headers DROP COLUMN election_proof;
CREATE VIEW chain_visualizer_blocks_view AS
SELECT * FROM block_headers
;
ALTER TABLE public.messages DROP COLUMN params;
ALTER TABLE public.receipts DROP COLUMN return;
`)

down := batch(`
CREATE TABLE public.drand_entries (
round bigint NOT NULL,
data bytea NOT NULL
);
-- view depends on ticket and election_proof
DROP VIEW IF EXISTS chain_visualizer_blocks_view;
ALTER TABLE public.block_headers ADD COLUMN ticket bytea;
ALTER TABLE public.block_headers ADD COLUMN election_proof bytea;
CREATE VIEW chain_visualizer_blocks_view AS
SELECT * FROM block_headers
;
ALTER TABLE public.messages ADD COLUMN params bytea;
ALTER TABLE public.receipts ADD COLUMN return bytea;
`)

migrations.MustRegisterTx(up, down)
}
2 changes: 0 additions & 2 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
var models = []interface{}{
(*blocks.BlockHeader)(nil),
(*blocks.BlockParent)(nil),

(*blocks.DrandEntrie)(nil),
(*blocks.DrandBlockEntrie)(nil),

(*miner.MinerSectorDeal)(nil),
Expand Down
2 changes: 0 additions & 2 deletions storage/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@ func TestLeaseGasOutputsMessages(t *testing.T) {
ParentWeight: "parentweight",
ParentBaseFee: "parentbasefee",
ParentStateRoot: "parentstateroot",
Ticket: []byte("ticket"),
}
}

Expand Down Expand Up @@ -784,7 +783,6 @@ func TestFindGasOutputsMessages(t *testing.T) {
ParentWeight: "parentweight",
ParentBaseFee: "parentbasefee",
ParentStateRoot: "parentstateroot",
Ticket: []byte("ticket"),
}
}

Expand Down
7 changes: 0 additions & 7 deletions tasks/indexer/blockdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type UnindexedBlockData struct {
height abi.ChainEpoch
blks blocks.BlockHeaders
parents blocks.BlockParents
drandEntries blocks.DrandEntries
drandBlockEntries blocks.DrandBlockEntries
tipsets visor.ProcessingTipSetList
}
Expand All @@ -44,7 +43,6 @@ func (u *UnindexedBlockData) AddTipSet(ts *types.TipSet) {
func (u *UnindexedBlockData) AddBlock(bh *types.BlockHeader) {
u.blks = append(u.blks, blocks.NewBlockHeader(bh))
u.parents = append(u.parents, blocks.NewBlockParents(bh)...)
u.drandEntries = append(u.drandEntries, blocks.NewDrandEnties(bh)...)
u.drandBlockEntries = append(u.drandBlockEntries, blocks.NewDrandBlockEntries(bh)...)
}

Expand All @@ -64,10 +62,6 @@ func (u *UnindexedBlockData) Persist(ctx context.Context, db *pg.DB) error {
return xerrors.Errorf("persist block parents: %w", err)
}

if err := u.drandEntries.PersistWithTx(ctx, tx); err != nil {
return xerrors.Errorf("persist drand entries: %w", err)
}

if err := u.drandBlockEntries.PersistWithTx(ctx, tx); err != nil {
return xerrors.Errorf("persist drand block entries: %w", err)
}
Expand Down Expand Up @@ -100,7 +94,6 @@ func (u *UnindexedBlockData) MarkSeen(tsk types.TipSetKey) {
func (u *UnindexedBlockData) Reset() {
u.blks = u.blks[:0]
u.parents = u.parents[:0]
u.drandEntries = u.drandEntries[:0]
u.drandBlockEntries = u.drandBlockEntries[:0]
u.tipsets = u.tipsets[:0]
}
15 changes: 0 additions & 15 deletions tasks/indexer/chainheadindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,6 @@ func TestChainHeadIndexer(t *testing.T) {
assert.True(t, exists, "block: %s", cid)
}
})

t.Run("drand_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_entries`)
require.NoError(t, err)
assert.Equal(t, len(rounds), count)

var m *blocks.DrandEntrie
for _, round := range rounds {
exists, err := db.Model(m).Where("round = ?", round).Exists()
require.NoError(t, err)
assert.True(t, exists, "round: %d", round)
}
})

t.Run("drand_block_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_block_entries`)
Expand Down
14 changes: 0 additions & 14 deletions tasks/indexer/chainhistoryindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,6 @@ func TestChainHistoryIndexer(t *testing.T) {
}
})

t.Run("drand_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_entries`)
require.NoError(t, err)
assert.Equal(t, len(rounds), count)

var m *blocks.DrandEntrie
for _, round := range rounds {
exists, err := db.Model(m).Where("round = ?", round).Exists()
require.NoError(t, err)
assert.True(t, exists, "round: %d", round)
}
})

t.Run("drand_block_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_block_entries`)
Expand Down
14 changes: 6 additions & 8 deletions tasks/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ func (p *MessageProcessor) extractMessageModels(ctx context.Context, node lens.A
SizeBytes: msgSize,
Nonce: message.Nonce,
Method: uint64(message.Method),
Params: message.Params,
}
result.Messages = append(result.Messages, msg)

Expand Down Expand Up @@ -344,7 +343,7 @@ func (p *MessageProcessor) extractMessageModels(ctx context.Context, node lens.A
dstActorCode = dstActor.Code.String()
}

if pm, err := parseMsg(msg, ts, dstActorCode); err == nil {
if pm, err := parseMsg(message, ts, dstActorCode); err == nil {
result.ParsedMessages = append(result.ParsedMessages, pm)
} else {
return nil, nil, xerrors.Errorf("parse message %s failed: %w", message.Cid().String(), err)
Expand Down Expand Up @@ -386,13 +385,13 @@ func cidsEqual(c1, c2 []cid.Cid) bool {
return true
}

func parseMsg(m *messagemodel.Message, ts *types.TipSet, destCode string) (*messagemodel.ParsedMessage, error) {
func parseMsg(m *types.Message, ts *types.TipSet, destCode string) (*messagemodel.ParsedMessage, error) {
pm := &messagemodel.ParsedMessage{
Cid: m.Cid,
Cid: m.Cid().String(),
Height: int64(ts.Height()),
From: m.From,
To: m.To,
Value: m.Value,
From: m.From.String(),
To: m.To.String(),
Value: m.Value.String(),
}

actor, ok := statediff.LotusActorCodes[destCode]
Expand Down Expand Up @@ -469,7 +468,6 @@ func (p *MessageProcessor) fetchReceipts(ctx context.Context, node lens.API, ts
Idx: i,
ExitCode: int64(r.ExitCode),
GasUsed: r.GasUsed,
Return: r.Return,
})
}
}
Expand Down

0 comments on commit d588dea

Please sign in to comment.