Skip to content

Commit

Permalink
feat(task): Create chainvis views and refresher (#77)
Browse files Browse the repository at this point in the history
* feat(task): Create chainvis views and refresher

* fix: Put views into slice for processing
  • Loading branch information
placer14 authored Oct 9, 2020
1 parent 9146893 commit 0d67282
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 1 deletion.
19 changes: 19 additions & 0 deletions commands/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/sentinel-visor/tasks/actorstate"
"github.com/filecoin-project/sentinel-visor/tasks/indexer"
"github.com/filecoin-project/sentinel-visor/tasks/message"
"github.com/filecoin-project/sentinel-visor/tasks/views"
)

var Run = &cli.Command{
Expand Down Expand Up @@ -152,6 +153,14 @@ var Run = &cli.Command{
Usage: "Number of gas outputs processors to start",
EnvVars: []string{"VISOR_GASOUTPUTS_WORKERS"},
},

&cli.DurationFlag{
Name: "chainvis-refresh-rate",
Aliases: []string{"crr"},
Value: 0,
Usage: "Refresh frequency for chain visualization views (0 = disables refresh)",
EnvVars: []string{"VISOR_CHAINVIS_REFRESH"},
},
},
Action: func(cctx *cli.Context) error {
// Validate flags
Expand Down Expand Up @@ -257,6 +266,16 @@ var Run = &cli.Command{
})
}

// Include optional refresher for Chain Visualization views
// Zero duration will cause ChainVisRefresher to exit and should not restart
scheduler.Add(schedule.TaskConfig{
Name: "ChainVisRefresher",
Locker: NewGlobalSingleton(ChainVisRefresherLockID, rctx.db), // only need one chain vis refresher anywhere
Task: views.NewChainVisRefresher(rctx.db, cctx.Duration("chainvis-refresh-rate")),
RestartOnFailure: true,
RestartOnCompletion: false,
})

// Start the scheduler and wait for it to complete or to be cancelled.
err = scheduler.Run(ctx)
if !errors.Is(err, context.Canceled) {
Expand Down
1 change: 1 addition & 0 deletions commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func setupLogging(cctx *cli.Context) error {
const (
ChainHeadIndexerLockID = 98981111
ChainHistoryIndexerLockID = 98981112
ChainVisRefresherLockID = 98981113
)

func NewGlobalSingleton(id int64, d *storage.Database) *GlobalSingleton {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ require (
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/multiformats/go-multihash v0.0.14
github.com/raulk/clock v1.1.0
github.com/prometheus/client_golang v1.6.0
github.com/raulk/clock v1.1.0
github.com/stretchr/objx v0.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/urfave/cli/v2 v2.2.0
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM=
Expand Down Expand Up @@ -1276,6 +1277,7 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI=
github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
Expand Down Expand Up @@ -1382,6 +1384,7 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/
github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
Expand Down
81 changes: 81 additions & 0 deletions storage/migrations/7_chainvis_views.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package migrations

import (
"github.com/go-pg/migrations/v8"
)

// Schema version 7 produces views which support queries for
// tipset visualization

// See https://github.com/DigitalMOB2/filecoin-lotus-explorer-playground

func init() {
up := batch(`
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_chain_data_view AS
SELECT
main_block.cid AS block,
bp.parent AS parent,
main_block.miner,
main_block.height,
main_block.parent_weight AS parentweight,
main_block.timestamp,
main_block.parent_state_root AS parentstateroot,
parent_block.timestamp AS parenttimestamp,
parent_block.height AS parentheight,
mp.raw_bytes_power AS parentpower,
synced.synced_at AS syncedtimestamp,
(SELECT COUNT(*) FROM block_messages WHERE block_messages.block = main_block.cid) AS messages
FROM
block_headers main_block
LEFT JOIN
block_parents bp ON bp.block = main_block.cid
LEFT JOIN
block_headers parent_block ON parent_block.cid = bp.parent
LEFT JOIN
blocks_synced synced ON synced.cid = main_block.cid
LEFT JOIN
miner_power mp ON main_block.parent_state_root = mp.state_root
WITH NO DATA;
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_orphans_view AS
SELECT
block_headers.cid AS block,
block_headers.miner,
block_headers.height,
block_headers.parent_weight AS parentweight,
block_headers.timestamp,
block_headers.parent_state_root AS parentstateroot,
block_parents.parent AS parent
FROM
block_headers
LEFT JOIN
block_parents ON block_headers.cid = block_parents.parent
WHERE
block_parents.block IS NULL
WITH NO DATA;
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_with_parents_view AS
SELECT
block,
parent,
b.miner,
b.height,
b.timestamp
FROM
block_parents
INNER JOIN
block_headers b ON block_parents.block = b.cid
WITH NO DATA;
CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_view AS
SELECT * FROM block_headers
WITH NO DATA;
`)
down := batch(`
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_view;
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_with_parents_view;
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_orphans_view;
DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_chain_data_view;
`)
migrations.MustRegisterTx(up, down)
}
51 changes: 51 additions & 0 deletions tasks/views/chainvis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package views

import (
"context"
"fmt"
"time"

"golang.org/x/xerrors"

"github.com/filecoin-project/sentinel-visor/storage"
"github.com/filecoin-project/sentinel-visor/wait"
)

var chainVisViews = []string{
"chain_visualizer_blocks_view",
"chain_visualizer_blocks_with_parents_view",
"chain_visualizer_chain_data_view",
"chain_visualizer_orphans_view",
}

func NewChainVisRefresher(d *storage.Database, refreshRate time.Duration) *ChainVisRefresher {
return &ChainVisRefresher{
db: d,
refreshRate: refreshRate,
}
}

// ChainVisRefresher is a task which refreshes a set of views that support
// chain visualization queries at a specific refreshRate
type ChainVisRefresher struct {
db *storage.Database
refreshRate time.Duration
}

// Run starts regularly refreshing until context is done or an error occurs
func (r *ChainVisRefresher) Run(ctx context.Context) error {
if r.refreshRate == 0 {
return nil
}
return wait.RepeatUntil(ctx, r.refreshRate, r.refreshView)
}

func (r *ChainVisRefresher) refreshView(ctx context.Context) (bool, error) {
for _, v := range chainVisViews {
_, err := r.db.DB.ExecContext(ctx, fmt.Sprintf("REFRESH MATERIALIZED VIEW %s;", v))
if err != nil {
return true, xerrors.Errorf("refresh %s: %w", v, err)
}
}
return false, nil
}

0 comments on commit 0d67282

Please sign in to comment.