Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gas outputs): Add Height and ActorName #270

Merged
merged 10 commits into from
Dec 3, 2020
7 changes: 2 additions & 5 deletions model/derived/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type GasOutputs struct {
tableName struct{} `pg:"derived_gas_outputs"` //nolint: structcheck,unused
Height int64 `pg:",pk,use_zero,notnull"`
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to backfill the value for this column.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value corresponds to the height of the message I think (not that of the receipts).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the height of the message. Backfilling is a problem. With the current version it will take some time. With the new in-order processing it takes <1s per tipset, but that's still ~211000 seconds = ~2.4 days unless we shard the processing.

An alternative is to figure out a query that can update this table by joining with messages. That will still be slow to run, several hours at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Custom in-order processing that just does messages and just updates this should be faster (?). That said, 2.4 days should be ok.

hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
Cid string `pg:",pk,notnull"`
From string `pg:",notnull"`
To string `pg:",notnull"`
Expand All @@ -22,6 +23,7 @@ type GasOutputs struct {
SizeBytes int `pg:",use_zero,notnull"`
Nonce uint64 `pg:",use_zero,notnull"`
Method uint64 `pg:",use_zero,notnull"`
ActorName string `pg:",notnull"`
StateRoot string `pg:",notnull"`
ExitCode int64 `pg:",use_zero,notnull"`
GasUsed int64 `pg:",use_zero,notnull"`
Expand Down Expand Up @@ -65,8 +67,3 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
}
return nil
}

type ProcessingGasOutputs struct {
Height int64
GasOutputs
}
hsanjuan marked this conversation as resolved.
Show resolved Hide resolved
22 changes: 22 additions & 0 deletions storage/migrations/25_gasoutputs_height_code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package migrations

import "github.com/go-pg/migrations/v8"

// Schema version 22 adds Height and ActorName to gas outputs table

func init() {
up := batch(`
ALTER TABLE public.derived_gas_outputs ADD COLUMN height bigint NOT NULL;
ALTER TABLE public.derived_gas_outputs ADD COLUMN actor_name text NOT NULL;
ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see in other migrations we do _pk instead of _pkey, should I change that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pkey is the convention used by postgres when you create a primary key

ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (cid, height);
`)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iand can shall we leave like this, or add state_root too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add state_root - I'll do it

down := batch(`
ALTER TABLE public.derived_gas_outputs DROP CONSTRAINT derived_gas_outputs_pkey;
ALTER TABLE public.derived_gas_outputs ADD PRIMARY KEY (cid);
ALTER TABLE public.derived_gas_outputs DROP COLUMN height;
ALTER TABLE public.derived_gas_outputs DROP COLUMN actor_name;
`)

migrations.MustRegisterTx(up, down)
}
8 changes: 4 additions & 4 deletions storage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,10 +540,10 @@ func useNullIfEmpty(s string) *string {
}

// LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) {
func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) {
stop := metrics.Timer(ctx, metrics.BatchSelectionDuration)
defer stop()
var list []*derived.ProcessingGasOutputs
var list []*derived.GasOutputs

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &list, `
Expand Down Expand Up @@ -583,11 +583,11 @@ SELECT * FROM leased;
}

// FindGasOutputsMessages finds a set of messages that have receipts for gas output processing but does not take a lease out. minHeight and maxHeight define an inclusive range of heights to process.
func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) {
func (d *Database) FindGasOutputsMessages(ctx context.Context, batchSize int, minHeight, maxHeight int64) ([]*derived.GasOutputs, error) {
stop := metrics.Timer(ctx, metrics.BatchSelectionDuration)
defer stop()

var list []*derived.ProcessingGasOutputs
var list []*derived.GasOutputs

if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error {
_, err := tx.QueryContext(ctx, &list, `
Expand Down
41 changes: 39 additions & 2 deletions tasks/message/gasoutputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package message

import (
"context"
"errors"
"time"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/types"
"github.com/go-pg/pg/v10"
"github.com/ipfs/go-cid"
"github.com/raulk/clock"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/api/global"
Expand All @@ -15,6 +21,7 @@ import (
"github.com/filecoin-project/sentinel-visor/metrics"
"github.com/filecoin-project/sentinel-visor/model/derived"
"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/wait"
)

Expand Down Expand Up @@ -65,7 +72,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) (

claimUntil := p.clock.Now().Add(p.leaseLength)

var batch []*derived.ProcessingGasOutputs
var batch []*derived.GasOutputs
var err error

if p.useLeases {
Expand Down Expand Up @@ -104,7 +111,7 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) (

errorLog := log.With("cid", item.Cid)

if err := p.processItem(ctx, node, &item.GasOutputs); err != nil {
if err := p.processItem(ctx, node, item); err != nil {
// Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch
errorLog.Errorw("failed to process message", "error", err.Error())
if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil {
Expand All @@ -125,6 +132,35 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
defer stop()

// Note: this item will only be processed if there are receipts for
// it, which means there should be a tipset at height+1. This is only
// used to get the destination actor code, so we don't care about side
// chains.
Comment on lines +135 to +138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps its worth adding a check to ensure child != item? Alternatively, null rounds can be handled via: https://github.com/filecoin-project/sentinel-visor/pull/214/files#diff-a771f3c826cfe5ebe05c1a003873af47456ee3e16188c1e5d0a1162c0b3e27acR40

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that I just want to extract the dstActorCode, is it a problem if child == parent? The actor code will be the same even if it comes from a slightly wrong state. But maybe I'm missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the destination actor may no exist in child (when child == parent) if this is the first message being sent to the destination actor. But based on the below code maybe that isn't an issue -- We just won't know the designation actor code if the actor is created in a block preceding a null round.

child, err := node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(item.Height+1), types.NewTipSetKey())
if err != nil {
return xerrors.Errorf("Failed to load child tipset: %w", err)
}

st, err := state.LoadStateTree(node.Store(), child.ParentState())
if err != nil {
return xerrors.Errorf("load state tree when gas outputs for %s: %w", item.Cid, err)
}

dstAddr, err := address.NewFromString(item.To)
if err != nil {
return xerrors.Errorf("parse to address failed for gas outputs in %s: %w", item.Cid, err)
}

var dstActorCode cid.Cid
dstActor, err := st.GetActor(dstAddr)
if err != nil {
if !errors.Is(err, types.ErrActorNotFound) {
return xerrors.Errorf("get destination actor for gas outputs %s failed: %w", item.Cid, err)
}
} else {
dstActorCode = dstActor.Code
}

Comment on lines +139 to +163
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish there was a better way, but this should be ok if the state is cached at some layer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#249 is much more efficient but will be a few days before it's ready. Currently checking that the data it produces matches the existing data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, still need to support it in the "standard" way which I trust will become way faster with store improvement and block caching at some point.

baseFee, err := big.FromString(item.ParentBaseFee)
if err != nil {
return xerrors.Errorf("parse fee cap: %w", err)
Expand All @@ -144,6 +180,7 @@ func (p *GasOutputsProcessor) processItem(ctx context.Context, node lens.API, it
outputs := node.ComputeGasOutputs(item.GasUsed, item.GasLimit, baseFee, feeCap, gasPremium)
cgoStop()

item.ActorName = actorstate.ActorNameByCode(dstActorCode)
item.BaseFeeBurn = outputs.BaseFeeBurn.String()
item.OverEstimationBurn = outputs.OverEstimationBurn.String()
item.MinerPenalty = outputs.MinerPenalty.String()
Expand Down