Skip to content

Commit

Permalink
feat(gas outputs): Add Height and ActorName (#270)
Browse files Browse the repository at this point in the history
* Feat(gas outputs): Add Height and ActorName

Gas outputs are very important for many base-fee and gas-usage related views.

The table has no height, which means it needs to be joined with blocks all the
time on something like state_root to get a timestamp. This is likely
innefficient.

Additionally, it needs to be joined with actors to get the actor code. Since
we are "deriving" and already include redundant info, we might also get this
one. This allows easy filtering of messages by actor type and method.

* feat(gas outputs): Migrations: add height and actor_name columns

* Migration 22: set height to bigint

* feat(migrations): add constrains to new derived_gas_outputs columns

* feat(migrations): add height column as primary key in derived_gas_outputs.

* feat(migrations): re-index gasoutputs_height_code migration to 24.

* feat(migrations): fix gas_output_migration.

* Add state_root to derived_gas_outputs index

Co-authored-by: Ian Davis <[email protected]>
  • Loading branch information
hsanjuan and iand authored Dec 3, 2020
1 parent 4021e6c commit 627e829
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 29 deletions.
33 changes: 16 additions & 17 deletions chain/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
derivedmodel "github.com/filecoin-project/sentinel-visor/model/derived"
messagemodel "github.com/filecoin-project/sentinel-visor/model/messages"
visormodel "github.com/filecoin-project/sentinel-visor/model/visor"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
)

type MessageProcessor struct {
Expand Down Expand Up @@ -164,23 +165,20 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts

outputs := p.node.ComputeGasOutputs(m.Receipt.GasUsed, m.Message.GasLimit, m.BlockHeader.ParentBaseFee, m.Message.GasFeeCap, m.Message.GasPremium)
gasOutput := &derivedmodel.GasOutputs{
// TODO: add Height and ActorName as per https://github.com/filecoin-project/sentinel-visor/pull/270
// Height: msg.Height,
Cid: msg.Cid,
From: msg.From,
To: msg.To,
Value: msg.Value,
GasFeeCap: msg.GasFeeCap,
GasPremium: msg.GasPremium,
GasLimit: msg.GasLimit,
Nonce: msg.Nonce,
Method: msg.Method,
StateRoot: m.BlockHeader.ParentStateRoot.String(),
ExitCode: rcpt.ExitCode,
GasUsed: rcpt.GasUsed,
ParentBaseFee: m.BlockHeader.ParentBaseFee.String(),

// TODO: is SizeBytes really needed here?
Height: msg.Height,
Cid: msg.Cid,
From: msg.From,
To: msg.To,
Value: msg.Value,
GasFeeCap: msg.GasFeeCap,
GasPremium: msg.GasPremium,
GasLimit: msg.GasLimit,
Nonce: msg.Nonce,
Method: msg.Method,
StateRoot: m.BlockHeader.ParentStateRoot.String(),
ExitCode: rcpt.ExitCode,
GasUsed: rcpt.GasUsed,
ParentBaseFee: m.BlockHeader.ParentBaseFee.String(),
SizeBytes: msgSize,
BaseFeeBurn: outputs.BaseFeeBurn.String(),
OverEstimationBurn: outputs.OverEstimationBurn.String(),
Expand All @@ -189,6 +187,7 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts
Refund: outputs.Refund.String(),
GasRefund: outputs.GasRefund,
GasBurned: outputs.GasBurned,
ActorName: actorstate.ActorNameByCode(m.ToActorCode),
}
gasOutputsResults = append(gasOutputsResults, gasOutput)

Expand Down
9 changes: 3 additions & 6 deletions model/derived/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import (

type GasOutputs struct {
tableName struct{} `pg:"derived_gas_outputs"` //nolint: structcheck,unused
Height int64 `pg:",pk,use_zero,notnull"`
Cid string `pg:",pk,notnull"`
StateRoot string `pg:",pk,notnull"`
From string `pg:",notnull"`
To string `pg:",notnull"`
Value string `pg:",notnull"`
Expand All @@ -22,7 +24,7 @@ type GasOutputs struct {
SizeBytes int `pg:",use_zero,notnull"`
Nonce uint64 `pg:",use_zero,notnull"`
Method uint64 `pg:",use_zero,notnull"`
StateRoot string `pg:",notnull"`
ActorName string `pg:",notnull"`
ExitCode int64 `pg:",use_zero,notnull"`
GasUsed int64 `pg:",use_zero,notnull"`
ParentBaseFee string `pg:",notnull"`
Expand Down Expand Up @@ -65,8 +67,3 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
}
return nil
}

type ProcessingGasOutputs struct {
Height int64
GasOutputs
}
43 changes: 43 additions & 0 deletions storage/migrations/25_gasoutputs_height_code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package migrations

import "github.com/go-pg/migrations/v8"

// Schema version 25 adds Height and ActorName to gas outputs table

func init() {
up := batch(`
DO $$
BEGIN
IF (
SELECT count(*)
FROM information_schema.constraint_column_usage
WHERE table_schema = 'public'
AND table_name = 'derived_gas_outputs'
AND constraint_name='derived_gas_outputs_pkey'
AND column_name IN ('height','cid','state_root')
) != 3 -- want all three columns in the index
THEN
-- Can't change primary key while data exists
TRUNCATE TABLE public.derived_gas_outputs;
ALTER TABLE public.derived_gas_outputs ADD COLUMN height bigint NOT NULL;
ALTER TABLE public.derived_gas_outputs ADD COLUMN actor_name text NOT NULL;
ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey;
ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (height,cid,state_root);
END IF;
END
$$;
`)
down := batch(`
TRUNCATE TABLE public.derived_gas_outputs;
ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey;
ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (cid);
ALTER TABLE public.derived_gas_outputs DROP COLUMN height;
ALTER TABLE public.derived_gas_outputs DROP COLUMN actor_name;
`)

migrations.MustRegisterTx(up, down)
}
8 changes: 4 additions & 4 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,10 @@ func useNullIfEmpty(s string) *string {
}

// LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) {
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) {
stop := metrics.Timer(ctx, metrics.BatchSelectionDuration)
defer stop()
var list []*derived.ProcessingGasOutputs
var list []*derived.GasOutputs

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &list, `
Expand Down Expand Up @@ -583,11 +583,11 @@ SELECT * FROM leased;
}

// FindGasOutputsMessages finds a set of messages that have receipts for gas output processing but does not take a lease out. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) {
func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) {
stop := metrics.Timer(ctx, metrics.BatchSelectionDuration)
defer stop()

var list []*derived.ProcessingGasOutputs
var list []*derived.GasOutputs

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &list, `
Expand Down
41 changes: 39 additions & 2 deletions tasks/message/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package message

import (
"context"
"errors"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/go-pg/pg/v10"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
Expand All @@ -15,6 +21,7 @@ import (
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model/derived"
"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/wait"
)

Expand Down Expand Up @@ -65,7 +72,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) (

claimUntil := p.clock.Now().Add(p.leaseLength)

var batch []*derived.ProcessingGasOutputs
var batch []*derived.GasOutputs
var err error

if p.useLeases {
Expand Down Expand Up @@ -104,7 +111,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) (

errorLog := log.With("cid", item.Cid)

if err := p.processItem(ctx, node, &item.GasOutputs); err != nil {
if err := p.processItem(ctx, node, item); err != nil {
// Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch
errorLog.Errorw("failed to process message", "error", err.Error())
if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil {
Expand All @@ -125,6 +132,35 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
defer stop()

// Note: this item will only be processed if there are receipts for
// it, which means there should be a tipset at height+1. This is only
// used to get the destination actor code, so we don't care about side
// chains.
child, err := node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(item.Height+1), types.NewTipSetKey())
if err != nil {
return xerrors.Errorf("Failed to load child tipset: %w", err)
}

st, err := state.LoadStateTree(node.Store(), child.ParentState())
if err != nil {
return xerrors.Errorf("load state tree when gas outputs for %s: %w", item.Cid, err)
}

dstAddr, err := address.NewFromString(item.To)
if err != nil {
return xerrors.Errorf("parse to address failed for gas outputs in %s: %w", item.Cid, err)
}

var dstActorCode cid.Cid
dstActor, err := st.GetActor(dstAddr)
if err != nil {
if !errors.Is(err, types.ErrActorNotFound) {
return xerrors.Errorf("get destination actor for gas outputs %s failed: %w", item.Cid, err)
}
} else {
dstActorCode = dstActor.Code
}

baseFee, err := big.FromString(item.ParentBaseFee)
if err != nil {
return xerrors.Errorf("parse fee cap: %w", err)
Expand All @@ -144,6 +180,7 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it
outputs := node.ComputeGasOutputs(item.GasUsed, item.GasLimit, baseFee, feeCap, gasPremium)
cgoStop()

item.ActorName = actorstate.ActorNameByCode(dstActorCode)
item.BaseFeeBurn = outputs.BaseFeeBurn.String()
item.OverEstimationBurn = outputs.OverEstimationBurn.String()
item.MinerPenalty = outputs.MinerPenalty.String()
Expand Down

0 comments on commit 627e829

Please sign in to comment.