Skip to content

Commit

Permalink
feat: abstract model storage and add csv output for walk command (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
iand authored Dec 15, 2020
1 parent 37d3fd7 commit 4ae639c
Show file tree
Hide file tree
Showing 70 changed files with 914 additions and 1,038 deletions.
6 changes: 3 additions & 3 deletions chain/actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewActorStateProcessor(opener lens.APIOpener, extracterMap ActorExtractorMa
return p
}

func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (p *ActorStateProcessor) ProcessActors(ctx context.Context, ts *types.TipSe
}
}

data := make(PersistableWithTxList, 0, len(actors))
data := make(model.PersistableList, 0, len(actors))
errorsDetected := make([]*ActorStateError, 0, len(actors))
skippedActors := 0

Expand Down Expand Up @@ -172,7 +172,7 @@ type ActorStateResult struct {
Address string
Error error
SkippedParse bool
Data model.PersistableWithTx
Data model.Persistable
}

type ActorStateError struct {
Expand Down
4 changes: 2 additions & 2 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ func NewBlockProcessor() *BlockProcessor {
return &BlockProcessor{}
}

func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
var pl PersistableWithTxList
func (p *BlockProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
var pl model.PersistableList
for _, bh := range ts.Blocks() {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion chain/economics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewChainEconomicsProcessor(opener lens.APIOpener) *ChainEconomicsProcessor
}
}

func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
func (p *ChainEconomicsProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
Expand Down
20 changes: 10 additions & 10 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var _ indexer.TipSetObserver = (*TipSetIndexer)(nil)
// A TipSetWatcher waits for tipsets and persists their block data into a database.
type TipSetIndexer struct {
window time.Duration
storage Storage
storage model.Storage
processors map[string]TipSetProcessor
actorProcessors map[string]ActorProcessor
name string
Expand All @@ -55,7 +55,7 @@ type TipSetIndexer struct {
// and persistence are concurrent. Extraction of the a tipset can proceed while data from the previous extraction is
// being persisted. The indexer may be given a time window in which to complete data extraction. The name of the
// indexer is used as the reporter in the visor_processing_reports table.
func NewTipSetIndexer(o lens.APIOpener, d Storage, window time.Duration, name string, tasks []string) (*TipSetIndexer, error) {
func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, name string, tasks []string) (*TipSetIndexer, error) {
tsi := &TipSetIndexer{
storage: d,
window: window,
Expand Down Expand Up @@ -133,7 +133,7 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
results := make(chan *TaskResult, len(t.processors)+len(t.actorProcessors))

// A map to gather the persistable outputs from each task
taskOutputs := make(map[string]PersistableWithTxList, len(t.processors)+len(t.actorProcessors))
taskOutputs := make(map[string]model.PersistableList, len(t.processors)+len(t.actorProcessors))

// Run each tipset processing task concurrently
for name, p := range t.processors {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
Status: visormodel.ProcessingStatusError,
ErrorsDetected: terr,
}
taskOutputs[name] = PersistableWithTxList{report}
taskOutputs[name] = model.PersistableList{report}
}
return terr
}
Expand Down Expand Up @@ -243,7 +243,7 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
llt.Infow("task report", "status", res.Report.Status, "time", res.Report.CompletedAt.Sub(res.Report.StartedAt))

// Persist the processing report and the data in a single transaction
taskOutputs[res.Task] = PersistableWithTxList{res.Report, res.Data}
taskOutputs[res.Task] = model.PersistableList{res.Report, res.Data}
}

// remember the last tipset we observed
Expand Down Expand Up @@ -277,9 +277,9 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {

// Persist each processor's data concurrently since they don't overlap
for task, p := range taskOutputs {
go func(task string, p model.PersistableWithTx) {
go func(task string, p model.Persistable) {
defer wg.Done()
if err := t.storage.Persist(ctx, p); err != nil {
if err := t.storage.PersistBatch(ctx, p); err != nil {
ll.Errorw("persistence failed", "task", task, "error", err)
return
}
Expand Down Expand Up @@ -356,19 +356,19 @@ type TaskResult struct {
Task string
Error error
Report *visormodel.ProcessingReport
Data model.PersistableWithTx
Data model.Persistable
}

type TipSetProcessor interface {
// ProcessTipSet processes a tipset. If error is non-nil then the processor encountered a fatal error.
// Any data returned must be accompanied by a processing report.
ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error)
ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error)
Close() error
}

type ActorProcessor interface {
// ProcessActor processes a set of actors. If error is non-nil then the processor encountered a fatal error.
// Any data returned must be accompanied by a processing report.
ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, actors map[string]types.Actor) (model.PersistableWithTx, *visormodel.ProcessingReport, error)
ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, actors map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error)
Close() error
}
8 changes: 4 additions & 4 deletions chain/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewMessageProcessor(opener lens.APIOpener) *MessageProcessor {
}
}

func (p *MessageProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
func (p *MessageProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
Expand All @@ -47,7 +47,7 @@ func (p *MessageProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet)
p.closer = closer
}

var data model.PersistableWithTx
var data model.Persistable
var report *visormodel.ProcessingReport
var err error

Expand Down Expand Up @@ -75,7 +75,7 @@ func (p *MessageProcessor) ProcessTipSet(ctx context.Context, ts *types.TipSet)
}

// Note that all this processing is in the context of the parent tipset. The child is only used for receipts
func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts *types.TipSet) (model.PersistableWithTx, *visormodel.ProcessingReport, error) {
func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
report := &visormodel.ProcessingReport{
Height: int64(pts.Height()),
StateRoot: pts.ParentState().String(),
Expand Down Expand Up @@ -234,7 +234,7 @@ func (p *MessageProcessor) processExecutedMessages(ctx context.Context, ts, pts
report.ErrorsDetected = errorsDetected
}

return PersistableWithTxList{
return model.PersistableList{
messageResults,
receiptResults,
blockMessageResults,
Expand Down
39 changes: 0 additions & 39 deletions chain/storage.go

This file was deleted.

32 changes: 23 additions & 9 deletions commands/walk.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/chain"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
)

Expand All @@ -39,6 +41,11 @@ var Walk = &cli.Command{
Value: strings.Join([]string{chain.BlocksTask, chain.MessagesTask, chain.ChainEconomicsTask, chain.ActorStatesRawTask}, ","),
EnvVars: []string{"VISOR_WALK_TASKS"},
},
&cli.StringFlag{
Name: "csv",
Usage: "Path to write csv files.",
Hidden: true,
},
},
Action: walk,
}
Expand Down Expand Up @@ -76,17 +83,24 @@ func walk(cctx *cli.Context) error {
lensCloser()
}()

var storage chain.Storage = &chain.NullStorage{}
if cctx.String("db") == "" {
log.Warnw("database not specified, data will not be persisted")
} else {
db, err := setupDatabase(cctx)
var strg model.Storage = &storage.NullStorage{}
if cctx.String("csv") != "" {
csvStorage, err := storage.NewCSVStorage(cctx.String("csv"))
if err != nil {
return xerrors.Errorf("setup database: %w", err)
return xerrors.Errorf("new csv storage: %w", err)
}
strg = csvStorage
} else {
if cctx.String("db") == "" {
log.Warnw("database not specified, data will not be persisted")
} else {
db, err := setupDatabase(cctx)
if err != nil {
return xerrors.Errorf("setup database: %w", err)
}
strg = db
}
storage = db
}

// Set up a context that is canceled when the command is interrupted
ctx, cancel := context.WithCancel(cctx.Context)
defer cancel()
Expand All @@ -104,7 +118,7 @@ func walk(cctx *cli.Context) error {

scheduler := schedule.NewScheduler(cctx.Duration("task-delay"))

tsIndexer, err := chain.NewTipSetIndexer(lensOpener, storage, 0, cctx.String("name"), tasks)
tsIndexer, err := chain.NewTipSetIndexer(lensOpener, strg, 0, cctx.String("name"), tasks)
if err != nil {
return xerrors.Errorf("setup indexer: %w", err)
}
Expand Down
4 changes: 3 additions & 1 deletion commands/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/chain"
"github.com/filecoin-project/sentinel-visor/model"
"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
)

Expand Down Expand Up @@ -63,7 +65,7 @@ func watch(cctx *cli.Context) error {
lensCloser()
}()

var storage chain.Storage = &chain.NullStorage{}
var storage model.Storage = &storage.NullStorage{}
if cctx.String("db") == "" {
log.Warnw("database not specified, data will not be persisted")
} else {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ require (
github.com/go-pg/migrations/v8 v8.0.1
github.com/go-pg/pg/v10 v10.3.1
github.com/go-pg/pgext v0.1.4
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4
github.com/hashicorp/golang-lru v0.5.4
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.7
Expand All @@ -28,6 +29,7 @@ require (
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/ipld/go-ipld-prime v0.5.1-0.20201021195245-109253e8a018
github.com/jackc/pgx/v4 v4.9.0
github.com/jinzhu/inflection v1.0.0
github.com/lib/pq v1.8.0
github.com/libp2p/go-libp2p-core v0.7.0
github.com/mitchellh/go-homedir v1.1.0
Expand Down
7 changes: 2 additions & 5 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
Expand Down Expand Up @@ -357,6 +356,8 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4 h1:Q7s2AN3DhFJKOnzO0uTKLhJTfXTEcXcvw5ylf2BHJw4=
github.com/gocarina/gocsv v0.0.0-20201208093247-67c824bc04d4/go.mod h1:5YoVOkjYAQumqlV356Hj3xeYh4BdZuLE0/nRkf2NKkI=
github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
Expand Down Expand Up @@ -715,7 +716,6 @@ github.com/ipld/go-ipld-prime-proto v0.1.0 h1:j7gjqrfwbT4+gXpHwEx5iMssma3mnctC7Y
github.com/ipld/go-ipld-prime-proto v0.1.0/go.mod h1:11zp8f3sHVgIqtb/c9Kr5ZGqpnCLF1IVTNOez9TopzE=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c=
github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52/go.mod h1:fdg+/X9Gg4AsAIzWpEHwnqd+QY3b7lajxyjE1m4hkq4=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
Expand All @@ -734,7 +734,6 @@ github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye47
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
Expand Down Expand Up @@ -1433,7 +1432,6 @@ github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
Expand Down Expand Up @@ -1543,7 +1541,6 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
Expand Down
Loading

0 comments on commit 4ae639c

Please sign in to comment.