Skip to content

Commit

Permalink
feat: add message gas economy processing
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist authored and iand committed Oct 14, 2020
1 parent 7644b3e commit 8ed8183
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 27 deletions.
32 changes: 32 additions & 0 deletions model/messages/gaseconomy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package messages

import (
"context"

"github.com/go-pg/pg/v10"
"golang.org/x/xerrors"
)

type MessageGasEconomy struct {
tableName struct{} `pg:"message_gas_economy"`
StateRoot string `pg:",pk,notnull"`

BaseFee float64 `pg:",use_zero"`
BaseFeeChangeLog float64 `pg:",use_zero"`

GasLimitTotal int64 `pg:",use_zero"`
GasLimitUniqueTotal int64 `pg:",use_zero"`

GasFillRatio float64 `pg:",use_zero"`
GasCapacityRatio float64 `pg:",use_zero"`
GasWasteRatio float64 `pg:",use_zero"`
}

func (g *MessageGasEconomy) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, g).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting derived gas economy: %w", err)
}
return nil
}
10 changes: 7 additions & 3 deletions model/messages/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type MessageTaskResult struct {
Messages Messages
BlockMessages BlockMessages
Receipts Receipts
Messages Messages
BlockMessages BlockMessages
Receipts Receipts
MessageGasEconomy *MessageGasEconomy
}

func (mtr *MessageTaskResult) Persist(ctx context.Context, db *pg.DB) error {
Expand Down Expand Up @@ -41,6 +42,9 @@ func (mtr *MessageTaskResult) PersistWithTx(ctx context.Context, tx *pg.Tx) erro
if err := mtr.Receipts.PersistWithTx(ctx, tx); err != nil {
return err
}
if err := mtr.MessageGasEconomy.PersistWithTx(ctx, tx); err != nil {
return err
}

return nil
}
1 change: 1 addition & 0 deletions storage/migrations/0_null.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package migrations
26 changes: 26 additions & 0 deletions storage/migrations/8_message_gas_economy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

func init() {
up := batch(`
CREATE TABLE public.message_gas_economy (
"state_root" text NOT NULL,
"gas_limit_total" double precision NOT NULL,
"gas_limit_unique_total "double precision NULL,
"base_fee" double precision NOT NULL,
"base_fee_change_log" double precision NOT NULL,
"gas_fill_ratio" double precision NULL,
"gas_capacity_ratio" double precision NULL,
"gas_waste_ratio" double precision NULL,
PRIMARY KEY ("state_root")
);
`)
down := batch(`
DROP TABLE IF EXISTS public.message_gas_economy;
`)

migrations.MustRegisterTx(up, down)
}
102 changes: 78 additions & 24 deletions tasks/message/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ package message

import (
"context"
"math"
"math/big"
"time"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/go-pg/pg/v10"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -140,7 +145,7 @@ func (p *MessageProcessor) processTipSet(ctx context.Context, ts *types.TipSet)

ll := log.With("height", int64(ts.Height()))

msgs, blkMsgs, processingMsgs, err := p.fetchMessages(ctx, ts)
blkMsgs, err := p.fetchMessages(ctx, ts)
if err != nil {
return xerrors.Errorf("fetch messages: %w", err)
}
Expand All @@ -150,10 +155,16 @@ func (p *MessageProcessor) processTipSet(ctx context.Context, ts *types.TipSet)
return xerrors.Errorf("fetch receipts: %w", err)
}

msgs, bmsgs, processingMsgs, econ, err := p.extractMessageModels(ctx, ts, blkMsgs)
if err != nil {
return xerrors.Errorf("extract message models: %w", err)
}

result := &messagemodel.MessageTaskResult{
Messages: msgs,
BlockMessages: blkMsgs,
Receipts: rcts,
Messages: msgs,
BlockMessages: bmsgs,
Receipts: rcts,
MessageGasEconomy: econ,
}

ll.Debugw("persisting tipset", "messages", len(msgs), "block_messages", len(blkMsgs), "receipts", len(rcts))
Expand All @@ -173,54 +184,75 @@ func (p *MessageProcessor) processTipSet(ctx context.Context, ts *types.TipSet)
return nil
}

func (p *MessageProcessor) fetchMessages(ctx context.Context, ts *types.TipSet) (messagemodel.Messages, messagemodel.BlockMessages, visor.ProcessingMessageList, error) {
msgs := messagemodel.Messages{}
bmsgs := messagemodel.BlockMessages{}
pmsgs := visor.ProcessingMessageList{}
msgsSeen := map[cid.Cid]struct{}{}

// TODO consider performing this work in parallel.
func (p *MessageProcessor) fetchMessages(ctx context.Context, ts *types.TipSet) (map[cid.Cid]*api.BlockMessages, error) {
out := make(map[cid.Cid]*api.BlockMessages)
for _, blk := range ts.Cids() {
// Stop processing if we have somehow passed our own lease time
select {
case <-ctx.Done():
return nil, nil, nil, ctx.Err()
return nil, ctx.Err()
default:
}

blkMsgs, err := p.node.ChainGetBlockMessages(ctx, blk)
if err != nil {
return nil, nil, nil, err
return nil, err
}
out[blk] = blkMsgs
}
return out, nil
}

vmm := make([]*types.Message, 0, len(blkMsgs.Cids))
for _, m := range blkMsgs.BlsMessages {
vmm = append(vmm, m)
func (p *MessageProcessor) extractMessageModels(ctx context.Context, ts *types.TipSet, blkMsgs map[cid.Cid]*api.BlockMessages) (messagemodel.Messages, messagemodel.BlockMessages, visor.ProcessingMessageList, *messagemodel.MessageGasEconomy, error) {
msgModels := messagemodel.Messages{}
bmsgModels := messagemodel.BlockMessages{}
pmsgModels := visor.ProcessingMessageList{}

msgsSeen := map[cid.Cid]struct{}{}
totalGasLimit := int64(0)
totalUniqGasLimit := int64(0)

for blk, msgs := range blkMsgs {
// Stop processing if we have somehow passed our own lease time
select {
case <-ctx.Done():
return nil, nil, nil, nil, ctx.Err()
default:
}

for _, m := range blkMsgs.SecpkMessages {
// extract all messages, vmm will include duplicate messages.
vmm := make([]*types.Message, 0, len(msgs.Cids))
for _, m := range msgs.BlsMessages {
vmm = append(vmm, m)
}
for _, m := range msgs.SecpkMessages {
vmm = append(vmm, &m.Message)
}

for _, message := range vmm {
bmsgs = append(bmsgs, &messagemodel.BlockMessage{
// record which blocks had which messages
bmsgModels = append(bmsgModels, &messagemodel.BlockMessage{
Block: blk.String(),
Message: message.Cid().String(),
})

// so we don't create duplicate message models.
totalUniqGasLimit += message.GasLimit
if _, seen := msgsSeen[message.Cid()]; seen {
continue
}
totalGasLimit += message.GasLimit

// Record this message for processing by later stages
pmsgs = append(pmsgs, visor.NewProcessingMessage(message, int64(ts.Height())))
// record this message for processing by later stages
pmsgModels = append(pmsgModels, visor.NewProcessingMessage(message, int64(ts.Height())))

var msgSize int
if b, err := message.Serialize(); err == nil {
msgSize = len(b)
} else {
return nil, nil, nil, nil, err
}
msgs = append(msgs, &messagemodel.Message{

// record all unique messages
msgModels = append(msgModels, &messagemodel.Message{
Cid: message.Cid().String(),
From: message.From.String(),
To: message.To.String(),
Expand All @@ -233,10 +265,32 @@ func (p *MessageProcessor) fetchMessages(ctx context.Context, ts *types.TipSet)
Method: uint64(message.Method),
Params: message.Params,
})

msgsSeen[message.Cid()] = struct{}{}
}

}
return msgs, bmsgs, pmsgs, nil
newBaseFee := store.ComputeNextBaseFee(ts.Blocks()[0].ParentBaseFee, totalUniqGasLimit, len(ts.Blocks()), ts.Height())
baseFeeRat := new(big.Rat).SetFrac(newBaseFee.Int, new(big.Int).SetUint64(build.FilecoinPrecision))
baseFee, _ := baseFeeRat.Float64()

baseFeeChange := new(big.Rat).SetFrac(newBaseFee.Int, ts.Blocks()[0].ParentBaseFee.Int)
baseFeeChangeF, _ := baseFeeChange.Float64()

return msgModels,
bmsgModels,
pmsgModels,
&messagemodel.MessageGasEconomy{
StateRoot: ts.ParentState().String(),
GasLimitTotal: totalGasLimit,
GasLimitUniqueTotal: totalUniqGasLimit,
BaseFee: baseFee,
BaseFeeChangeLog: math.Log(baseFeeChangeF) / math.Log(1.125),
GasFillRatio: float64(totalGasLimit) / float64(len(ts.Blocks())*build.BlockGasTarget),
GasCapacityRatio: float64(totalUniqGasLimit) / float64(len(ts.Blocks())*build.BlockGasTarget),
GasWasteRatio: float64(totalGasLimit-totalUniqGasLimit) / float64(len(ts.Blocks())*build.BlockGasTarget),
},
nil
}

func (p *MessageProcessor) fetchReceipts(ctx context.Context, ts *types.TipSet) (messagemodel.Receipts, error) {
Expand Down

0 comments on commit 8ed8183

Please sign in to comment.