From 0d67282a0ae56413c7d8f0eec4949b1a7723ab46 Mon Sep 17 00:00:00 2001 From: Mike Greenberg Date: Thu, 8 Oct 2020 22:27:47 -0400 Subject: [PATCH] feat(task): Create chainvis views and refresher (#77) * feat(task): Create chainvis views and refresher * fix: Put views into slice for processing --- commands/run.go | 19 ++++++ commands/setup.go | 1 + go.mod | 2 +- go.sum | 3 + storage/migrations/7_chainvis_views.go | 81 ++++++++++++++++++++++++++ tasks/views/chainvis.go | 51 ++++++++++++++++ 6 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 storage/migrations/7_chainvis_views.go create mode 100644 tasks/views/chainvis.go diff --git a/commands/run.go b/commands/run.go index 8bea9d3be..a163be336 100644 --- a/commands/run.go +++ b/commands/run.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/sentinel-visor/tasks/actorstate" "github.com/filecoin-project/sentinel-visor/tasks/indexer" "github.com/filecoin-project/sentinel-visor/tasks/message" + "github.com/filecoin-project/sentinel-visor/tasks/views" ) var Run = &cli.Command{ @@ -152,6 +153,14 @@ var Run = &cli.Command{ Usage: "Number of gas outputs processors to start", EnvVars: []string{"VISOR_GASOUTPUTS_WORKERS"}, }, + + &cli.DurationFlag{ + Name: "chainvis-refresh-rate", + Aliases: []string{"crr"}, + Value: 0, + Usage: "Refresh frequency for chain visualization views (0 = disables refresh)", + EnvVars: []string{"VISOR_CHAINVIS_REFRESH"}, + }, }, Action: func(cctx *cli.Context) error { // Validate flags @@ -257,6 +266,16 @@ var Run = &cli.Command{ }) } + // Include optional refresher for Chain Visualization views + // Zero duration will cause ChainVisRefresher to exit and should not restart + scheduler.Add(schedule.TaskConfig{ + Name: "ChainVisRefresher", + Locker: NewGlobalSingleton(ChainVisRefresherLockID, rctx.db), // only need one chain vis refresher anywhere + Task: views.NewChainVisRefresher(rctx.db, cctx.Duration("chainvis-refresh-rate")), + RestartOnFailure: true, + RestartOnCompletion: false, + }) + // Start the scheduler and wait for it to complete or to be cancelled. err = scheduler.Run(ctx) if !errors.Is(err, context.Canceled) { diff --git a/commands/setup.go b/commands/setup.go index cae3de4ec..c750f2d2b 100644 --- a/commands/setup.go +++ b/commands/setup.go @@ -175,6 +175,7 @@ func setupLogging(cctx *cli.Context) error { const ( ChainHeadIndexerLockID = 98981111 ChainHistoryIndexerLockID = 98981112 + ChainVisRefresherLockID = 98981113 ) func NewGlobalSingleton(id int64, d *storage.Database) *GlobalSingleton { diff --git a/go.mod b/go.mod index 9e78b3d1b..563e5221d 100644 --- a/go.mod +++ b/go.mod @@ -24,8 +24,8 @@ require ( github.com/mattn/go-isatty v0.0.12 // indirect github.com/multiformats/go-multiaddr v0.3.1 github.com/multiformats/go-multihash v0.0.14 - github.com/raulk/clock v1.1.0 github.com/prometheus/client_golang v1.6.0 + github.com/raulk/clock v1.1.0 github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.6.1 github.com/urfave/cli/v2 v2.2.0 diff --git a/go.sum b/go.sum index 7d93d0559..21bd9a4bd 100644 --- a/go.sum +++ b/go.sum @@ -153,6 +153,7 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= +github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -1276,6 +1277,7 @@ github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6L github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= +github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -1382,6 +1384,7 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= +github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= diff --git a/storage/migrations/7_chainvis_views.go b/storage/migrations/7_chainvis_views.go new file mode 100644 index 000000000..de7150a1d --- /dev/null +++ b/storage/migrations/7_chainvis_views.go @@ -0,0 +1,81 @@ +package migrations + +import ( + "github.com/go-pg/migrations/v8" +) + +// Schema version 7 produces views which support queries for +// tipset visualization + +// See https://github.com/DigitalMOB2/filecoin-lotus-explorer-playground + +func init() { + up := batch(` +CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_chain_data_view AS + SELECT + main_block.cid AS block, + bp.parent AS parent, + main_block.miner, + main_block.height, + main_block.parent_weight AS parentweight, + main_block.timestamp, + main_block.parent_state_root AS parentstateroot, + parent_block.timestamp AS parenttimestamp, + parent_block.height AS parentheight, + mp.raw_bytes_power AS parentpower, + synced.synced_at AS syncedtimestamp, + (SELECT COUNT(*) FROM block_messages WHERE block_messages.block = main_block.cid) AS messages + FROM + block_headers main_block + LEFT JOIN + block_parents bp ON bp.block = main_block.cid + LEFT JOIN + block_headers parent_block ON parent_block.cid = bp.parent + LEFT JOIN + blocks_synced synced ON synced.cid = main_block.cid + LEFT JOIN + miner_power mp ON main_block.parent_state_root = mp.state_root +WITH NO DATA; + +CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_orphans_view AS + SELECT + block_headers.cid AS block, + block_headers.miner, + block_headers.height, + block_headers.parent_weight AS parentweight, + block_headers.timestamp, + block_headers.parent_state_root AS parentstateroot, + block_parents.parent AS parent + FROM + block_headers + LEFT JOIN + block_parents ON block_headers.cid = block_parents.parent + WHERE + block_parents.block IS NULL +WITH NO DATA; + +CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_with_parents_view AS + SELECT + block, + parent, + b.miner, + b.height, + b.timestamp + FROM + block_parents + INNER JOIN + block_headers b ON block_parents.block = b.cid +WITH NO DATA; + +CREATE MATERIALIZED VIEW IF NOT EXISTS chain_visualizer_blocks_view AS + SELECT * FROM block_headers +WITH NO DATA; +`) + down := batch(` +DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_view; +DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_blocks_with_parents_view; +DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_orphans_view; +DROP MATERIALIZED VIEW IF EXISTS chain_visualizer_chain_data_view; + `) + migrations.MustRegisterTx(up, down) +} diff --git a/tasks/views/chainvis.go b/tasks/views/chainvis.go new file mode 100644 index 000000000..909c4ef49 --- /dev/null +++ b/tasks/views/chainvis.go @@ -0,0 +1,51 @@ +package views + +import ( + "context" + "fmt" + "time" + + "golang.org/x/xerrors" + + "github.com/filecoin-project/sentinel-visor/storage" + "github.com/filecoin-project/sentinel-visor/wait" +) + +var chainVisViews = []string{ + "chain_visualizer_blocks_view", + "chain_visualizer_blocks_with_parents_view", + "chain_visualizer_chain_data_view", + "chain_visualizer_orphans_view", +} + +func NewChainVisRefresher(d *storage.Database, refreshRate time.Duration) *ChainVisRefresher { + return &ChainVisRefresher{ + db: d, + refreshRate: refreshRate, + } +} + +// ChainVisRefresher is a task which refreshes a set of views that support +// chain visualization queries at a specific refreshRate +type ChainVisRefresher struct { + db *storage.Database + refreshRate time.Duration +} + +// Run starts regularly refreshing until context is done or an error occurs +func (r *ChainVisRefresher) Run(ctx context.Context) error { + if r.refreshRate == 0 { + return nil + } + return wait.RepeatUntil(ctx, r.refreshRate, r.refreshView) +} + +func (r *ChainVisRefresher) refreshView(ctx context.Context) (bool, error) { + for _, v := range chainVisViews { + _, err := r.db.DB.ExecContext(ctx, fmt.Sprintf("REFRESH MATERIALIZED VIEW %s;", v)) + if err != nil { + return true, xerrors.Errorf("refresh %s: %w", v, err) + } + } + return false, nil +}