Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Visor Vector Builder & Executor #370

Merged
merged 10 commits into from
Feb 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ visor
sentinel-visor

build/.*
vector/data/*.json
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure the vector files don't get pushed to github.

2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ build/.update-modules:

.PHONY: deps
deps: build/.update-modules
cd ./vector; ./fetch_vectors.sh

# test starts dependencies and runs all tests
.PHONY: test
Expand Down Expand Up @@ -77,6 +78,7 @@ docker-image:

clean:
rm -rf $(CLEAN) $(BINS)
rm ./vector/data/*json
.PHONY: clean

dist-clean:
Expand Down
4 changes: 4 additions & 0 deletions chain/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ type ActorExtractorMap interface {
GetExtractor(code cid.Cid) (actorstate.ActorStateExtractor, bool)
}

type ActorExtractorFilter interface {
AllowAddress(addr string) bool
}

// A RawActorExtractorMap extracts all types of actors using basic actor extraction which only parses shallow state.
type RawActorExtractorMap struct{}

Expand Down
13 changes: 13 additions & 0 deletions chain/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package chain

func NewAddressFilter(addr string) *AddressFilter {
return &AddressFilter{address: addr}
}

type AddressFilter struct {
address string
}

func (f *AddressFilter) Allow(addr string) bool {
return f.address == addr
}
23 changes: 22 additions & 1 deletion chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,22 @@ type TipSetIndexer struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
addressFilter *AddressFilter
}

type TipSetIndexerOpt func(t *TipSetIndexer)

func AddressFilterOpt(f *AddressFilter) TipSetIndexerOpt {
return func(t *TipSetIndexer) {
t.addressFilter = f
}
}

// A TipSetIndexer extracts block, message and actor state data from a tipset and persists it to storage. Extraction
// and persistence are concurrent. Extraction of the a tipset can proceed while data from the previous extraction is
// being persisted. The indexer may be given a time window in which to complete data extraction. The name of the
// indexer is used as the reporter in the visor_processing_reports table.
func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, name string, tasks []string) (*TipSetIndexer, error) {
func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, name string, tasks []string, options ...TipSetIndexerOpt) (*TipSetIndexer, error) {
tsi := &TipSetIndexer{
storage: d,
window: window,
Expand Down Expand Up @@ -123,6 +132,11 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
return nil, xerrors.Errorf("unknown task: %s", task)
}
}

for _, opt := range options {
opt(tsi)
}

return tsi, nil
}

Expand Down Expand Up @@ -223,6 +237,13 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
if len(t.actorProcessors) > 0 {
changes, err := t.node.StateChangedActors(tctx, parent.ParentState(), child.ParentState())
if err == nil {
if t.addressFilter != nil {
for addr := range changes {
if !t.addressFilter.Allow(addr) {
delete(changes, addr)
}
}
}
for name, p := range t.actorProcessors {
inFlight++
go t.runActorProcessor(tctx, p, name, child, parent, changes, results)
Expand Down
140 changes: 140 additions & 0 deletions commands/vector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package commands

import (
"context"
"os"
"os/signal"
"strings"
"syscall"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/chain"
"github.com/filecoin-project/sentinel-visor/vector"
)

var Vector = &cli.Command{
Name: "vector",
Usage: "Vector tooling for Visor.",
Subcommands: []*cli.Command{
BuildVector,
ExecuteVector,
},
}

var BuildVector = &cli.Command{
Name: "build",
Usage: "Create a vector.",
Flags: []cli.Flag{
&cli.Int64Flag{
Name: "from",
Usage: "Limit actor and message processing to tipsets at or above `HEIGHT`",
EnvVars: []string{"VISOR_HEIGHT_FROM"},
},
&cli.Int64Flag{
Name: "to",
Usage: "Limit actor and message processing to tipsets at or below `HEIGHT`",
Value: estimateCurrentEpoch(),
DefaultText: "current epoch",
EnvVars: []string{"VISOR_HEIGHT_TO"},
},
&cli.StringFlag{
Name: "tasks",
Usage: "Comma separated list of tasks to build. Each task is reported separately in the database.",
Value: strings.Join([]string{chain.BlocksTask}, ","),
EnvVars: []string{"VISOR_VECTOR_TASKS"},
},
&cli.StringFlag{
Name: "actor-address",
Usage: "Address of an actor.",
},
&cli.StringFlag{
Name: "vector-file",
Usage: "Path of vector file.",
Required: true,
},
&cli.StringFlag{
Name: "vector-desc",
Usage: "Short description of the test vector.",
Required: true,
},
},
Action: build,
}

func build(cctx *cli.Context) error {
// Set up a context that is canceled when the command is interrupted
ctx, cancel := context.WithCancel(cctx.Context)

// Set up a signal handler to cancel the context
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
select {
case <-interrupt:
cancel()
case <-ctx.Done():
}
}()

if err := setupLogging(cctx); err != nil {
return xerrors.Errorf("setup logging: %w", err)
}

builder, err := vector.NewBuilder(cctx)
if err != nil {
return err
}

schema, err := builder.Build(ctx)
if err != nil {
return err
}

return schema.Persist(cctx.String("vector-file"))
}

var ExecuteVector = &cli.Command{
Name: "execute",
Usage: "execute a test vector",
Flags: []cli.Flag{
&cli.StringFlag{
Name: "vector-file",
Usage: "Path to vector file.",
Required: true,
},
},
Action: execute,
}

func execute(cctx *cli.Context) error {
// Set up a context that is canceled when the command is interrupted
ctx, cancel := context.WithCancel(cctx.Context)

// Set up a signal handler to cancel the context
go func() {
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, syscall.SIGTERM, syscall.SIGINT)
select {
case <-interrupt:
cancel()
case <-ctx.Done():
}
}()

if err := setupLogging(cctx); err != nil {
return xerrors.Errorf("setup logging: %w", err)
}
runner, err := vector.NewRunner(ctx, cctx.String("vector-file"), 0)
if err != nil {
return err
}

err = runner.Run(ctx)
if err != nil {
return err
}

return runner.Validate(ctx)
}
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@ require (
github.com/go-pg/migrations/v8 v8.0.1
github.com/go-pg/pg/v10 v10.3.1
github.com/go-pg/pgext v0.1.4
github.com/google/go-cmp v0.5.2
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-blockservice v0.1.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipld-cbor v0.0.5
github.com/ipfs/go-ipld-format v0.2.0
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/ipfs/go-merkledag v0.3.2
github.com/ipld/go-car v0.1.1-0.20201119040415-11b6074b6d4d
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/jackc/pgx/v4 v4.9.0
github.com/lib/pq v1.8.0
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1572,6 +1572,7 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.2 h1:gsqYFH8bb9ekPA12kRo0hfjngWQjkJPlN9R0N78BoUo=
github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
Expand Down
2 changes: 1 addition & 1 deletion lens/carrepo/carrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
return &tsk, nil
}

return util.NewAPIOpener(c, cacheDB, h)
return util.NewAPIOpener(c.Context, cacheDB, h, c.Int("lens-cache-hint"))
}
2 changes: 2 additions & 0 deletions lens/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lens

import (
"context"
"github.com/filecoin-project/lotus/node/modules/dtypes"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
Expand Down Expand Up @@ -55,6 +56,7 @@ type StateAPI interface {
StateReadState(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.ActorState, error)
StateGetReceipt(ctx context.Context, bcid cid.Cid, tsk types.TipSetKey) (*types.MessageReceipt, error)
StateVMCirculatingSupplyInternal(context.Context, types.TipSetKey) (api.CirculatingSupply, error)
StateNetworkName(context.Context) (dtypes.NetworkName, error)
}

type APICloser func()
Expand Down
2 changes: 1 addition & 1 deletion lens/s3repo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
return nil, nil, err
}

return util.NewAPIOpener(c, bs, bs.(*S3Blockstore).getMasterTsKey)
return util.NewAPIOpener(c.Context, bs, bs.(*S3Blockstore).getMasterTsKey, c.Int("lens-cache-hint"))
}
2 changes: 1 addition & 1 deletion lens/sqlrepo/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ func NewAPIOpener(c *cli.Context) (lens.APIOpener, lens.APICloser, error) {
return nil, nil, err
}

return util.NewAPIOpener(c, bs, bs.(*SqlBlockstore).getMasterTsKey)
return util.NewAPIOpener(c.Context, bs, bs.(*SqlBlockstore).getMasterTsKey, c.Int("lens-cache-hint"))
}
9 changes: 4 additions & 5 deletions lens/util/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
peer "github.com/libp2p/go-libp2p-core/peer"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/lens"
Expand All @@ -43,7 +42,7 @@ type APIOpener struct {

type HeadMthd func(ctx context.Context, lookback int) (*types.TipSetKey, error)

func NewAPIOpener(c *cli.Context, bs blockstore.Blockstore, head HeadMthd) (*APIOpener, lens.APICloser, error) {
func NewAPIOpener(ctx context.Context, bs blockstore.Blockstore, head HeadMthd, cacheHint int) (*APIOpener, lens.APICloser, error) {
rapi := LensAPI{}

if _, _, err := ulimit.ManageFdLimit(); err != nil {
Expand All @@ -65,7 +64,7 @@ func NewAPIOpener(c *cli.Context, bs blockstore.Blockstore, head HeadMthd) (*API

const safetyLookBack = 5

headKey, err := head(c.Context, safetyLookBack)
headKey, err := head(ctx, safetyLookBack)
if err != nil {
return nil, nil, err
}
Expand All @@ -91,8 +90,8 @@ func NewAPIOpener(c *cli.Context, bs blockstore.Blockstore, head HeadMthd) (*API
lr.Close()
}

rapi.Context = c.Context
rapi.cacheSize = c.Int("lens-cache-hint")
rapi.Context = ctx
rapi.cacheSize = cacheHint
return &APIOpener{rapi: &rapi}, sf, nil
}

Expand Down
Loading