Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Avoid ingesting binary and unused data #241

Merged
merged 3 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,7 @@ github.com/ipfs/go-fs-lock v0.0.6 h1:sn3TWwNVQqSeNjlWy6zQ1uUGAZrV3hPOyEA6y1/N2a0
github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28L7zESmM=
github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE=
github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10=
github.com/ipfs/go-graphsync v0.4.2 h1:Y/jt5r619yj0LI7OLtGKh4jYm8goYUcuJ09y7TZ3zMo=
github.com/ipfs/go-graphsync v0.4.2/go.mod h1:/VmbZTUdUMTbNkgzAiCEucIIAU3BkLE2cZrDCVUhyi0=
github.com/ipfs/go-graphsync v0.4.3/go.mod h1:mPOwDYv128gf8gxPFgXnz4fNrSYPsWyqisJ7ych+XDY=
github.com/ipfs/go-graphsync v0.5.0 h1:iaByvxq88Ys1KcaQzTS1wmRhNsNEo3SaUiSGqTSbGmM=
Expand Down
47 changes: 0 additions & 47 deletions model/blocks/drand.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,6 @@ import (
"golang.org/x/xerrors"
)

type DrandEntrie struct {
Round uint64 `pg:",pk,use_zero"`
Data []byte `pg:",notnull"`
}

func (de *DrandEntrie) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, de).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting drand entries: %w", err)
}
return nil
}

func NewDrandEnties(header *types.BlockHeader) DrandEntries {
var out DrandEntries
for _, ent := range header.BeaconEntries {
out = append(out, &DrandEntrie{
Round: ent.Round,
Data: ent.Data,
})
}
return out
}

type DrandEntries []*DrandEntrie

func (des DrandEntries) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
return des.PersistWithTx(ctx, tx)
})
}

func (des DrandEntries) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if len(des) == 0 {
return nil
}
ctx, span := global.Tracer("").Start(ctx, "DrandEntries.PersistWithTx", trace.WithAttributes(label.Int("count", len(des))))
defer span.End()
if _, err := tx.ModelContext(ctx, &des).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting drand entries: %w", err)
}
return nil
}

func NewDrandBlockEntries(header *types.BlockHeader) DrandBlockEntries {
var out DrandBlockEntries
for _, ent := range header.BeaconEntries {
Expand Down
5 changes: 0 additions & 5 deletions model/blocks/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type BlockHeader struct {
WinCount int64 `pg:",use_zero"`
Timestamp uint64 `pg:",use_zero"`
ForkSignaling uint64 `pg:",use_zero"`

Ticket []byte
ElectionProof []byte
}

func NewBlockHeader(bh *types.BlockHeader) *BlockHeader {
Expand All @@ -38,8 +35,6 @@ func NewBlockHeader(bh *types.BlockHeader) *BlockHeader {
WinCount: bh.ElectionProof.WinCount,
Timestamp: bh.Timestamp,
ForkSignaling: bh.ForkSignaling,
Ticket: bh.Ticket.VRFProof,
ElectionProof: bh.ElectionProof.VRFProof,
}
}

Expand Down
2 changes: 0 additions & 2 deletions model/messages/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ type Message struct {
SizeBytes int `pg:",use_zero"`
Nonce uint64 `pg:",use_zero"`
Method uint64 `pg:",use_zero"`

Params []byte
}

func (m *Message) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
Expand Down
2 changes: 0 additions & 2 deletions model/messages/receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ type Receipt struct {
Idx int `pg:",use_zero"`
ExitCode int64 `pg:",use_zero"`
GasUsed int64 `pg:",use_zero"`

Return []byte
}

func (r *Receipt) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
Expand Down
46 changes: 46 additions & 0 deletions storage/migrations/19_avoid_binary_ingest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

// Schema version 19 removes unused binary data

func init() {
up := batch(`
DROP TABLE IF EXISTS public.drand_entries;

-- view depends on ticket and election_proof
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_view;
ALTER TABLE public.block_headers DROP COLUMN ticket;
ALTER TABLE public.block_headers DROP COLUMN election_proof;
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_view AS
SELECT * FROM block_headers
WITH NO DATA;

ALTER TABLE public.messages DROP COLUMN params;

ALTER TABLE public.receipts DROP COLUMN return;
`)

down := batch(`
CREATE TABLE public.drand_entries (
round bigint NOT NULL,
data bytea NOT NULL
);

-- view depends on ticket and election_proof
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_view;
ALTER TABLE public.block_headers ADD COLUMN ticket bytea;
ALTER TABLE public.block_headers ADD COLUMN election_proof bytea;
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_view AS
SELECT * FROM block_headers
WITH NO DATA;

ALTER TABLE public.messages ADD COLUMN params bytea;

ALTER TABLE public.receipts ADD COLUMN return bytea;
`)

migrations.MustRegisterTx(up, down)
}
2 changes: 0 additions & 2 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ import (
var models = []interface{}{
(*blocks.BlockHeader)(nil),
(*blocks.BlockParent)(nil),

(*blocks.DrandEntrie)(nil),
(*blocks.DrandBlockEntrie)(nil),

(*miner.MinerSectorDeal)(nil),
Expand Down
2 changes: 0 additions & 2 deletions storage/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,6 @@ func TestLeaseGasOutputsMessages(t *testing.T) {
ParentWeight: "parentweight",
ParentBaseFee: "parentbasefee",
ParentStateRoot: "parentstateroot",
Ticket: []byte("ticket"),
}
}

Expand Down Expand Up @@ -784,7 +783,6 @@ func TestFindGasOutputsMessages(t *testing.T) {
ParentWeight: "parentweight",
ParentBaseFee: "parentbasefee",
ParentStateRoot: "parentstateroot",
Ticket: []byte("ticket"),
}
}

Expand Down
7 changes: 0 additions & 7 deletions tasks/indexer/blockdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type UnindexedBlockData struct {
height abi.ChainEpoch
blks blocks.BlockHeaders
parents blocks.BlockParents
drandEntries blocks.DrandEntries
drandBlockEntries blocks.DrandBlockEntries
tipsets visor.ProcessingTipSetList
}
Expand All @@ -44,7 +43,6 @@ func (u *UnindexedBlockData) AddTipSet(ts *types.TipSet) {
func (u *UnindexedBlockData) AddBlock(bh *types.BlockHeader) {
u.blks = append(u.blks, blocks.NewBlockHeader(bh))
u.parents = append(u.parents, blocks.NewBlockParents(bh)...)
u.drandEntries = append(u.drandEntries, blocks.NewDrandEnties(bh)...)
u.drandBlockEntries = append(u.drandBlockEntries, blocks.NewDrandBlockEntries(bh)...)
}

Expand All @@ -64,10 +62,6 @@ func (u *UnindexedBlockData) Persist(ctx context.Context, db *pg.DB) error {
return xerrors.Errorf("persist block parents: %w", err)
}

if err := u.drandEntries.PersistWithTx(ctx, tx); err != nil {
return xerrors.Errorf("persist drand entries: %w", err)
}

if err := u.drandBlockEntries.PersistWithTx(ctx, tx); err != nil {
return xerrors.Errorf("persist drand block entries: %w", err)
}
Expand Down Expand Up @@ -100,7 +94,6 @@ func (u *UnindexedBlockData) MarkSeen(tsk types.TipSetKey) {
func (u *UnindexedBlockData) Reset() {
u.blks = u.blks[:0]
u.parents = u.parents[:0]
u.drandEntries = u.drandEntries[:0]
u.drandBlockEntries = u.drandBlockEntries[:0]
u.tipsets = u.tipsets[:0]
}
15 changes: 0 additions & 15 deletions tasks/indexer/chainheadindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,6 @@ func TestChainHeadIndexer(t *testing.T) {
assert.True(t, exists, "block: %s", cid)
}
})

t.Run("drand_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_entries`)
require.NoError(t, err)
assert.Equal(t, len(rounds), count)

var m *blocks.DrandEntrie
for _, round := range rounds {
exists, err := db.Model(m).Where("round = ?", round).Exists()
require.NoError(t, err)
assert.True(t, exists, "round: %d", round)
}
})

t.Run("drand_block_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_block_entries`)
Expand Down
14 changes: 0 additions & 14 deletions tasks/indexer/chainhistoryindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,6 @@ func TestChainHistoryIndexer(t *testing.T) {
}
})

t.Run("drand_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_entries`)
require.NoError(t, err)
assert.Equal(t, len(rounds), count)

var m *blocks.DrandEntrie
for _, round := range rounds {
exists, err := db.Model(m).Where("round = ?", round).Exists()
require.NoError(t, err)
assert.True(t, exists, "round: %d", round)
}
})

t.Run("drand_block_entries", func(t *testing.T) {
var count int
_, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_block_entries`)
Expand Down
14 changes: 6 additions & 8 deletions tasks/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,6 @@ func (p *MessageProcessor) extractMessageModels(ctx context.Context, node lens.A
SizeBytes: msgSize,
Nonce: message.Nonce,
Method: uint64(message.Method),
Params: message.Params,
}
result.Messages = append(result.Messages, msg)

Expand Down Expand Up @@ -344,7 +343,7 @@ func (p *MessageProcessor) extractMessageModels(ctx context.Context, node lens.A
dstActorCode = dstActor.Code.String()
}

if pm, err := parseMsg(msg, ts, dstActorCode); err == nil {
if pm, err := parseMsg(message, ts, dstActorCode); err == nil {
result.ParsedMessages = append(result.ParsedMessages, pm)
} else {
return nil, nil, xerrors.Errorf("parse message %s failed: %w", message.Cid().String(), err)
Expand Down Expand Up @@ -386,13 +385,13 @@ func cidsEqual(c1, c2 []cid.Cid) bool {
return true
}

func parseMsg(m *messagemodel.Message, ts *types.TipSet, destCode string) (*messagemodel.ParsedMessage, error) {
func parseMsg(m *types.Message, ts *types.TipSet, destCode string) (*messagemodel.ParsedMessage, error) {
pm := &messagemodel.ParsedMessage{
Cid: m.Cid,
Cid: m.Cid().String(),
Height: int64(ts.Height()),
From: m.From,
To: m.To,
Value: m.Value,
From: m.From.String(),
To: m.To.String(),
Value: m.Value.String(),
}

actor, ok := statediff.LotusActorCodes[destCode]
Expand Down Expand Up @@ -469,7 +468,6 @@ func (p *MessageProcessor) fetchReceipts(ctx context.Context, node lens.API, ts
Idx: i,
ExitCode: int64(r.ExitCode),
GasUsed: r.GasUsed,
Return: r.Return,
})
}
}
Expand Down