diff --git a/commands/index.go b/commands/index.go deleted file mode 100644 index 62dba3133..000000000 --- a/commands/index.go +++ /dev/null @@ -1,56 +0,0 @@ -package commands - -import ( - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - indexer2 "github.com/filecoin-project/sentinel-visor/services/indexer" -) - -// TODO: rework to use new scheduler with just indexing tasks -var Index = &cli.Command{ - Name: "index", - Usage: "Index the lotus blockchain", - Action: func(cctx *cli.Context) error { - if err := setupLogging(cctx); err != nil { - return xerrors.Errorf("setup logging: %w", err) - } - - tcloser, err := setupTracing(cctx) - if err != nil { - return xerrors.Errorf("setup tracing: %w", err) - } - defer tcloser() - - ctx, rctx, err := setupStorageAndAPI(cctx) - if err != nil { - return err - } - defer func() { - rctx.closer() - if err := rctx.db.Close(ctx); err != nil { - log.Errorw("close database", "error", err) - } - }() - - indexer := indexer2.NewIndexer(rctx.db, rctx.api) - if err := indexer.InitHandler(ctx); err != nil { - return xerrors.Errorf("init indexer: %w", err) - } - - // Start the indexer and wait for it to complete or to be cancelled. - done := make(chan struct{}) - go func() { - defer close(done) - // TODO if the lotus daemon hangs up Start will exit. It should restart and wait for lotus to come back online. - err = indexer.Start(ctx) - }() - - select { - case <-ctx.Done(): - return nil - case <-done: - return err - } - }, -} diff --git a/commands/process.go b/commands/process.go deleted file mode 100644 index d598e9b21..000000000 --- a/commands/process.go +++ /dev/null @@ -1,62 +0,0 @@ -package commands - -import ( - "github.com/urfave/cli/v2" - "golang.org/x/xerrors" - - processor2 "github.com/filecoin-project/sentinel-visor/services/processor" -) - -// TODO: rework to use new scheduler with just actor state and message tasks -var Process = &cli.Command{ - Name: "process", - Usage: "Process indexed blocks of the lotus blockchain", - Flags: []cli.Flag{ - &cli.IntFlag{ - Name: "max-batch", - Value: 10, - }, - }, - Action: func(cctx *cli.Context) error { - if err := setupLogging(cctx); err != nil { - return xerrors.Errorf("setup logging: %w", err) - } - - tcloser, err := setupTracing(cctx) - if err != nil { - return xerrors.Errorf("setup tracing: %w", err) - } - defer tcloser() - - ctx, rctx, err := setupStorageAndAPI(cctx) - if err != nil { - return xerrors.Errorf("setup storage and api: %w", err) - } - defer func() { - rctx.closer() - if err := rctx.db.Close(ctx); err != nil { - log.Errorw("close database", "error", err) - } - }() - - processor := processor2.NewProcessor(rctx.db, rctx.api) - if err := processor.InitHandler(ctx, cctx.Int("max-batch")); err != nil { - return xerrors.Errorf("init processor: %w", err) - } - - // Start the processor and wait for it to complete or to be cancelled. - done := make(chan struct{}) - go func() { - defer close(done) - err = processor.Start(ctx) - }() - - select { - case <-ctx.Done(): - return nil - case <-done: - return err - } - - }, -} diff --git a/docker-compose.yml b/docker-compose.yml index a1feb0fd2..4f24f6813 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,36 +1,5 @@ version: '3.5' services: - redis: - container_name: redis - image: redis:6 - ports: - - "6379:6379" - environment: - - ALLOW_EMPTY_PASSWORD=yes - volumes: - - redis-data:/var/lib/redis - - redis-commander: - container_name: redis-commander - image: rediscommander/redis-commander:latest - restart: always - environment: - - REDIS_HOSTS=local:redis:6379 - ports: - - "8081:8081" - depends_on: - - redis - - workerui: - restart: always - environment: - - REDIS_HOSTS=redis://redis:6379 - build: - context: . - dockerfile: Dockerfile.worker - ports: - - "8181:8181" - timescaledb: container_name: timescaledb image: timescale/timescaledb:latest-pg10 diff --git a/go.mod b/go.mod index 099f8b304..04b44d2fe 100644 --- a/go.mod +++ b/go.mod @@ -15,15 +15,12 @@ require ( github.com/go-pg/migrations/v8 v8.0.1 github.com/go-pg/pg/v10 v10.3.1 github.com/go-pg/pgext v0.1.4 - github.com/gocraft/work v0.5.1 - github.com/gomodule/redigo v1.8.2 github.com/hashicorp/golang-lru v0.5.4 github.com/ipfs/go-cid v0.0.7 github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4 github.com/lib/pq v1.8.0 github.com/mattn/go-isatty v0.0.12 // indirect github.com/multiformats/go-multiaddr v0.3.1 - github.com/robfig/cron v1.2.0 // indirect github.com/stretchr/objx v0.2.0 // indirect github.com/stretchr/testify v1.6.1 github.com/urfave/cli/v2 v2.2.0 diff --git a/go.sum b/go.sum index 0a82df6ff..b557219f9 100644 --- a/go.sum +++ b/go.sum @@ -152,7 +152,6 @@ github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.0.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -179,12 +178,10 @@ github.com/dgraph-io/badger v1.6.0-rc1/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhY github.com/dgraph-io/badger v1.6.0/go.mod h1:zwt7syl517jmP8s94KqSxTlM6IMsdhYy6psNgSztDR4= github.com/dgraph-io/badger v1.6.1 h1:w9pSFNSdq/JPM1N12Fz/F/bzo993Is1W+Q7HjPzi7yg= github.com/dgraph-io/badger v1.6.1/go.mod h1:FRmFw3uxvcpa8zG3Rxs0th+hCLIuaQg8HlNV5bjgnuU= -github.com/dgraph-io/badger/v2 v2.0.3 h1:inzdf6VF/NZ+tJ8RwwYMjJMvsOALTHYdozn0qSl6XJI= github.com/dgraph-io/badger/v2 v2.0.3/go.mod h1:3KY8+bsP8wI0OEnQJAKpd4wIJW/Mm32yw2j/9FUVnIM= github.com/dgraph-io/badger/v2 v2.2007.2 h1:EjjK0KqwaFMlPin1ajhP943VPENHJdEz1KLIegjaI3k= github.com/dgraph-io/badger/v2 v2.2007.2/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/ristretto v0.0.2-0.20200115201040-8f368f2f2ab3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= -github.com/dgraph-io/ristretto v0.0.2 h1:a5WaUrDa0qm0YrAAS1tUykT5El3kt62KNZZeMxQn3po= github.com/dgraph-io/ristretto v0.0.2/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de h1:t0UHb5vdojIDUqktM6+xJAfScFBsVpXZmqC9dsgJmeA= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= @@ -256,7 +253,6 @@ github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261 h github.com/filecoin-project/go-paramfetch v0.0.2-0.20200701152213-3e0f0afdc261/go.mod h1:fZzmf4tftbwf9S37XRifoJlz7nCjRdIrMGLR07dKLCc= github.com/filecoin-project/go-state-types v0.0.0-20200903145444-247639ffa6ad/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I= github.com/filecoin-project/go-state-types v0.0.0-20200904021452-1883f36ca2f4/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I= -github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df h1:m2esXSuGBkuXlRyCsl1a/7/FkFam63o1OzIgzaHtOfI= github.com/filecoin-project/go-state-types v0.0.0-20200905071437-95828685f9df/go.mod h1:IQ0MBPnonv35CJHtWSN3YY1Hz2gkPru1Q9qoaYLxx9I= github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc h1:1vr/LoqGq5m5g37Q3sNSAjfwF1uJY0zmiHcvnxY6hik= github.com/filecoin-project/go-state-types v0.0.0-20200911004822-964d6c679cfc/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g= @@ -270,7 +266,6 @@ github.com/filecoin-project/go-storedcounter v0.0.0-20200421200003-1c99c62e8a5b/ github.com/filecoin-project/lotus v0.8.0 h1:V6wVAYoFyuvJ10vZ5eGwxs/QatzjEL2yjliVRTWoC0g= github.com/filecoin-project/lotus v0.8.0/go.mod h1:31qmN62Dm8APRHXmW/hMbqKcrkoPVISERSX7RGQQbOs= github.com/filecoin-project/specs-actors v0.9.4/go.mod h1:BStZQzx5x7TmCkLv0Bpa07U6cPKol6fd3w9KjMPZ6Z4= -github.com/filecoin-project/specs-actors v0.9.7 h1:7PAZ8kdqwBdmgf/23FCkQZLCXcVu02XJrkpkhBikiA8= github.com/filecoin-project/specs-actors v0.9.7/go.mod h1:wM2z+kwqYgXn5Z7scV1YHLyd1Q1cy0R8HfTIWQ0BFGU= github.com/filecoin-project/specs-actors v0.9.11 h1:TnpG7HAeiUrfj0mJM7UaPW0P2137H62RGof7ftT5Mas= github.com/filecoin-project/specs-actors v0.9.11/go.mod h1:czlvLQGEX0fjLLfdNHD7xLymy6L3n7aQzRWzsYGf+ys= @@ -320,8 +315,6 @@ github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34= -github.com/gocraft/work v0.5.1/go.mod h1:pc3n9Pb5FAESPPGfM0nL+7Q1xtgtRnF8rr/azzhQVlM= github.com/godbus/dbus v0.0.0-20190402143921-271e53dc4968/go.mod h1:/YcGZj5zSblfDWMMoOzV4fas9FZnQYTkDnsGvmh2Grw= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/gogo/googleapis v0.0.0-20180223154316-0cd9801be74a/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= @@ -371,8 +364,6 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gomodule/redigo v1.8.2 h1:H5XSIre1MB5NbPYFp+i1NBbb5qN1W8Y8YAQoAYbkm8k= -github.com/gomodule/redigo v1.8.2/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -547,7 +538,6 @@ github.com/ipfs/go-fs-lock v0.0.6/go.mod h1:OTR+Rj9sHiRubJh3dRhD15Juhd/+w6VPOY28 github.com/ipfs/go-graphsync v0.1.0/go.mod h1:jMXfqIEDFukLPZHqDPp8tJMbHO9Rmeb9CEGevngQbmE= github.com/ipfs/go-graphsync v0.2.1 h1:MdehhqBSuTI2LARfKLkpYnt0mUrqHs/mtuDnESXHBfU= github.com/ipfs/go-graphsync v0.2.1/go.mod h1:gEBvJUNelzMkaRPJTpg/jaKN4AQW/7wDWu0K92D8o10= -github.com/ipfs/go-hamt-ipld v0.1.1 h1:0IQdvwnAAUKmDE+PMJa5y1QiwOPHpI9+eAbQEEEYthk= github.com/ipfs/go-hamt-ipld v0.1.1/go.mod h1:1EZCr2v0jlCnhpa+aZ0JZYp8Tt2w16+JJOAVz17YcDk= github.com/ipfs/go-ipfs-blockstore v0.0.1/go.mod h1:d3WClOmRQKFnJ0Jz/jj/zmksX0ma1gROTlovZKBmN08= github.com/ipfs/go-ipfs-blockstore v0.1.0/go.mod h1:5aD0AvHPi7mZc6Ci1WCAhiBQu2IsfTduLl+422H6Rqw= @@ -1280,14 +1270,11 @@ github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmO github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= -github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= -github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v1.6.0 h1:G9tHG9lebljV9mfp9SNPDL36nCDxmo3zTlAf1YgvzmI= github.com/rs/cors v1.6.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= -github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= @@ -1394,7 +1381,6 @@ github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/ github.com/uber/jaeger-lib v2.2.0+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4= diff --git a/main.go b/main.go index 3fd499b34..e53fc4703 100644 --- a/main.go +++ b/main.go @@ -57,7 +57,7 @@ func main() { &cli.BoolFlag{ Name: "tracing", EnvVars: []string{"VISOR_TRACING"}, - Value: true, + Value: false, }, &cli.StringFlag{ Name: "jaeger-agent-host", @@ -91,8 +91,6 @@ func main() { }, }, Commands: []*cli.Command{ - commands.Process, - commands.Index, commands.Migrate, commands.Run, }, diff --git a/services/indexer/indexer.go b/services/indexer/indexer.go deleted file mode 100644 index 6b8ceee56..000000000 --- a/services/indexer/indexer.go +++ /dev/null @@ -1,207 +0,0 @@ -package indexer - -import ( - "container/list" - "context" - - lotus_api "github.com/filecoin-project/lotus/api" - pg "github.com/go-pg/pg/v10" - cid "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - "go.opentelemetry.io/otel/api/trace" - "go.opentelemetry.io/otel/label" - "golang.org/x/xerrors" - - store "github.com/filecoin-project/lotus/chain/store" - types "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/sentinel-visor/lens" - storage "github.com/filecoin-project/sentinel-visor/storage" -) - -var log = logging.Logger("indexer") - -// TODO figure our if you want this or the init handler -func NewIndexer(s *storage.Database, n lens.API) *Indexer { - return &Indexer{ - storage: s, - node: n, - } -} - -type Indexer struct { - storage *storage.Database - node lens.API - - startingHeight int64 - startingBlock cid.Cid - genesis *types.TipSet - - // TODO base this value on the spec: https://github.com/filecoin-project/specs-actors/pull/702 - finality int -} - -// InitHandler initializes Indexer with state needed to start sycning head events -func (i *Indexer) InitHandler(ctx context.Context) error { - gen, err := i.node.ChainGetGenesis(ctx) - if err != nil { - return xerrors.Errorf("get genesis: %w", err) - } - i.genesis = gen - blk, height, err := i.mostRecentlySyncedBlockHeight(ctx) - if err != nil { - return xerrors.Errorf("get synced block height: %w", err) - } - - finality := 1400 - i.startingBlock = blk - i.startingHeight = height - i.finality = finality - - log.Infow("initialized Indexer", "startingBlock", blk.String(), "startingHeight", height, "finality", finality) - return nil -} - -// Start runs the Indexer which blocks processing chain events until the context is cancelled or the api closes the -// connection. -func (i *Indexer) Start(ctx context.Context) error { - log.Info("starting Indexer") - hc, err := i.node.ChainNotify(ctx) - if err != nil { - return xerrors.Errorf("chain notify: %w", err) - } - - for { - select { - case <-ctx.Done(): - log.Info("stopping Indexer") - return nil - case headEvents, ok := <-hc: - if !ok { - log.Warn("ChainNotify channel closed, stopping Indexer") - return nil - } - if err := i.index(ctx, headEvents); err != nil { - return xerrors.Errorf("index: %w", err) - } - } - } -} - -func (i *Indexer) index(ctx context.Context, headEvents []*lotus_api.HeadChange) error { - ctx, span := global.Tracer("").Start(ctx, "Indexer.index") - defer span.End() - - for _, head := range headEvents { - log.Debugw("index", "event", head.Type) - switch head.Type { - case store.HCCurrent: - fallthrough - case store.HCApply: - // collect all blocks to index starting from head and walking down the chain - toIndex, err := i.collectBlocksToIndex(ctx, head.Val, i.startingHeight) - if err != nil { - return xerrors.Errorf("collect blocks: %w", err) - } - - // if there are no new blocks short circuit - if toIndex.Size() == 0 { - return nil - } - - // persist the blocks to storage - if err := toIndex.Persist(ctx, i.storage.DB); err != nil { - return xerrors.Errorf("persist: %w", err) - } - - // keep the heights block we have seen so we don't recollect it. - i.startingBlock, i.startingHeight = toIndex.Highest() - case store.HCRevert: - - // TODO - } - } - return nil -} - -// Read Operations // - -// TODO not sure if returning a map here is required, it gets passed to the publisher and then storage -// which doesn't need the CID key. I think we are just doing this for deduplication. -func (i *Indexer) collectBlocksToIndex(ctx context.Context, head *types.TipSet, maxHeight int64) (*UnindexedBlockData, error) { - ctx, span := global.Tracer("").Start(ctx, "Indexer.CollectBlocks", trace.WithAttributes(label.Int64("height", int64(head.Height())))) - defer span.End() - - // get at most finality blocks not exceeding maxHeight. These are blocks we have in the database but have not processed. - // Now we are going to walk down the chain from `head` until we have visited all blocks not in the database. - synced, err := i.storage.UnprocessedIndexedBlocks(ctx, int(maxHeight), i.finality) - if err != nil { - return nil, xerrors.Errorf("get unprocessed blocks: %w", err) - } - log.Infow("collect synced blocks", "count", len(synced)) - // well, this is complete shit - has := make(map[cid.Cid]struct{}) - for _, c := range synced { - key, err := cid.Decode(c.Cid) - if err != nil { - return nil, xerrors.Errorf("decode cid: %w", err) - } - has[key] = struct{}{} - } - // walk backwards from head until we find a block that we have - - toSync := NewUnindexedBlockData() - toVisit := list.New() - - for _, header := range head.Blocks() { - toVisit.PushBack(header) - } - - for toVisit.Len() > 0 { - bh := toVisit.Remove(toVisit.Back()).(*types.BlockHeader) - _, has := has[bh.Cid()] - if seen := toSync.Has(bh); seen || has { - continue - } - - toSync.Add(bh) - - if toSync.Size()%500 == 10 { - log.Debugw("to visit", "toVisit", toVisit.Len(), "toSync", toSync.Size(), "current_height", bh.Height) - } - - if bh.Height == 0 { - continue - } - - pts, err := i.node.ChainGetTipSet(ctx, types.NewTipSetKey(bh.Parents...)) - if err != nil { - return nil, xerrors.Errorf("get tipset: %w", err) - } - - for _, header := range pts.Blocks() { - toVisit.PushBack(header) - } - } - - log.Debugw("collected unsynced blocks", "count", toSync.Size()) - return toSync, nil -} - -func (i *Indexer) mostRecentlySyncedBlockHeight(ctx context.Context) (cid.Cid, int64, error) { - ctx, span := global.Tracer("").Start(ctx, "Indexer.mostRecentlySyncedBlockHeight") - defer span.End() - - task, err := i.storage.MostRecentSyncedBlock(ctx) - if err != nil { - if err == pg.ErrNoRows { - return i.genesis.Cids()[0], 0, nil - } - return cid.Undef, 0, xerrors.Errorf("query recent synced: %w", err) - } - c, err := cid.Decode(task.Cid) - if err != nil { - return cid.Undef, 0, xerrors.Errorf("decode cid: %w", err) - } - return c, task.Height, nil -} diff --git a/services/indexer/indexer_test.go b/services/indexer/indexer_test.go deleted file mode 100644 index 56e46f4c6..000000000 --- a/services/indexer/indexer_test.go +++ /dev/null @@ -1,237 +0,0 @@ -package indexer - -import ( - "context" - "os" - "testing" - "time" - - "github.com/filecoin-project/go-state-types/abi" - "github.com/filecoin-project/go-state-types/big" - apitest "github.com/filecoin-project/lotus/api/test" - "github.com/filecoin-project/lotus/build" - types "github.com/filecoin-project/lotus/chain/types" - nodetest "github.com/filecoin-project/lotus/node/test" - "github.com/filecoin-project/specs-actors/actors/builtin/miner" - "github.com/filecoin-project/specs-actors/actors/builtin/power" - "github.com/filecoin-project/specs-actors/actors/builtin/verifreg" - "github.com/filecoin-project/specs-actors/actors/util/adt" - - "github.com/go-pg/pg/v10" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model/blocks" - "github.com/filecoin-project/sentinel-visor/storage" - "github.com/filecoin-project/sentinel-visor/testutil" -) - -func init() { - build.InsecurePoStValidation = true - err := os.Setenv("TRUST_PARAMS", "1") - if err != nil { - panic(err) - } - miner.SupportedProofTypes = map[abi.RegisteredSealProof]struct{}{ - abi.RegisteredSealProof_StackedDrg2KiBV1: {}, - } - power.ConsensusMinerMinPower = big.NewInt(2048) - verifreg.MinVerifiedDealSize = big.NewInt(256) -} - -type nodeWrapper struct { - apitest.TestNode -} - -func (nodeWrapper) Store() adt.Store { - panic("not supported") -} - -func TestIndex(t *testing.T) { - if testing.Short() || !testutil.DatabaseAvailable() { - t.Skip("short testing requested or VISOR_TEST_DB not set") - } - - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() - - db, cleanup, err := testutil.WaitForExclusiveDatabase(ctx, t) - require.NoError(t, err) - defer cleanup() - - t.Logf("truncating database tables") - err = truncateBlockTables(db) - require.NoError(t, err, "truncating tables") - - t.Logf("preparing chain") - nodes, sn := nodetest.Builder(t, 1, apitest.OneMiner) - node := nodeWrapper{TestNode: nodes[0]} - - apitest.MineUntilBlock(ctx, t, nodes[0], sn[0], nil) - - head, err := node.ChainHead(ctx) - require.NoError(t, err, "chain head") - - t.Logf("collecting chain blocks") - bhs, err := collectBlockHeaders(node, head) - require.NoError(t, err, "collect chain blocks") - - cids := bhs.Cids() - rounds := bhs.Rounds() - - d := &storage.Database{DB: db} - t.Logf("initializing indexer") - idx := NewIndexer(d, node) - err = idx.InitHandler(ctx) - require.NoError(t, err, "init handler") - - newHeads, err := node.ChainNotify(ctx) - require.NoError(t, err, "chain notify") - - t.Logf("indexing chain") - nh := <-newHeads - err = idx.index(ctx, nh) - require.NoError(t, err, "index") - - t.Run("blocks_synced", func(t *testing.T) { - var count int - _, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM blocks_synced`) - require.NoError(t, err) - assert.Equal(t, len(cids), count) - - var m *blocks.BlockSynced - for _, cid := range cids { - exists, err := db.Model(m).Where("cid = ?", cid).Exists() - require.NoError(t, err) - assert.True(t, exists, "cid: %s", cid) - } - }) - - t.Run("block_headers", func(t *testing.T) { - var count int - _, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM block_headers`) - require.NoError(t, err) - assert.Equal(t, len(cids), count) - - var m *blocks.BlockHeader - for _, cid := range cids { - exists, err := db.Model(m).Where("cid = ?", cid).Exists() - require.NoError(t, err) - assert.True(t, exists, "cid: %s", cid) - } - }) - - t.Run("block_parents", func(t *testing.T) { - var count int - _, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM block_parents`) - require.NoError(t, err) - assert.Equal(t, len(cids), count) - - var m *blocks.BlockParent - for _, cid := range cids { - exists, err := db.Model(m).Where("block = ?", cid).Exists() - require.NoError(t, err) - assert.True(t, exists, "block: %s", cid) - } - }) - - t.Run("drand_entries", func(t *testing.T) { - var count int - _, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_entries`) - require.NoError(t, err) - assert.Equal(t, len(rounds), count) - - var m *blocks.DrandEntrie - for _, round := range rounds { - exists, err := db.Model(m).Where("round = ?", round).Exists() - require.NoError(t, err) - assert.True(t, exists, "round: %d", round) - } - }) - - t.Run("drand_block_entries", func(t *testing.T) { - var count int - _, err := db.QueryOne(pg.Scan(&count), `SELECT COUNT(*) FROM drand_block_entries`) - require.NoError(t, err) - assert.Equal(t, len(rounds), count) - - var m *blocks.DrandBlockEntrie - for _, round := range rounds { - exists, err := db.Model(m).Where("round = ?", round).Exists() - require.NoError(t, err) - assert.True(t, exists, "round: %d", round) - } - }) - -} - -type blockHeaderList []*types.BlockHeader - -func (b blockHeaderList) Cids() []string { - var cids []string - for _, bh := range b { - cids = append(cids, bh.Cid().String()) - } - return cids -} - -func (b blockHeaderList) Rounds() []uint64 { - var rounds []uint64 - for _, bh := range b { - for _, ent := range bh.BeaconEntries { - rounds = append(rounds, ent.Round) - } - } - - return rounds -} - -// collectBlockHeaders walks the chain to collect blocks that should be indexed -func collectBlockHeaders(n lens.API, ts *types.TipSet) (blockHeaderList, error) { - blocks := ts.Blocks() - - for _, bh := range ts.Blocks() { - if bh.Height == 0 { - continue - } - - parent, err := n.ChainGetTipSet(context.TODO(), types.NewTipSetKey(bh.Parents...)) - if err != nil { - return nil, err - } - - pblocks, err := collectBlockHeaders(n, parent) - if err != nil { - return nil, err - } - blocks = append(blocks, pblocks...) - - } - return blocks, nil -} - -// truncateBlockTables ensures the indexing tables are empty -func truncateBlockTables(db *pg.DB) error { - if _, err := db.Exec(`TRUNCATE TABLE blocks_synced`); err != nil { - return err - } - - if _, err := db.Exec(`TRUNCATE TABLE block_headers`); err != nil { - return err - } - - if _, err := db.Exec(`TRUNCATE TABLE block_parents`); err != nil { - return err - } - - if _, err := db.Exec(`TRUNCATE TABLE drand_entries`); err != nil { - return err - } - - if _, err := db.Exec(`TRUNCATE TABLE drand_block_entries`); err != nil { - return err - } - - return nil -} diff --git a/services/indexer/types.go b/services/indexer/types.go deleted file mode 100644 index cf62b1492..000000000 --- a/services/indexer/types.go +++ /dev/null @@ -1,126 +0,0 @@ -package indexer - -import ( - "context" - - "github.com/go-pg/pg/v10" - "github.com/ipfs/go-cid" - "go.opentelemetry.io/otel/api/global" - "golang.org/x/sync/errgroup" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/sentinel-visor/model/blocks" -) - -type ActorTips map[types.TipSetKey][]ActorInfo - -type ActorInfo struct { - Actor types.Actor - Address address.Address - ParentStateRoot cid.Cid - TipSet types.TipSetKey - ParentTipSet types.TipSetKey -} - -func NewUnindexedBlockData() *UnindexedBlockData { - return &UnindexedBlockData{ - has: make(map[cid.Cid]struct{}), - } -} - -// TODO put this somewhere else, maybe in the model? -type UnindexedBlockData struct { - has map[cid.Cid]struct{} - highest *types.BlockHeader - - blks blocks.BlockHeaders - synced blocks.BlocksSynced - parents blocks.BlockParents - drandEntries blocks.DrandEntries - drandBlockEntries blocks.DrandBlockEntries -} - -func (u *UnindexedBlockData) Highest() (cid.Cid, int64) { - return u.highest.Cid(), int64(u.highest.Height) -} - -func (u *UnindexedBlockData) Add(bh *types.BlockHeader) { - u.has[bh.Cid()] = struct{}{} - - if u.highest == nil { - u.highest = bh - } else if u.highest.Height < bh.Height { - u.highest = bh - } - - u.blks = append(u.blks, blocks.NewBlockHeader(bh)) - u.synced = append(u.synced, blocks.NewBlockSynced(bh)) - u.parents = append(u.parents, blocks.NewBlockParents(bh)...) - u.drandEntries = append(u.drandEntries, blocks.NewDrandEnties(bh)...) - u.drandBlockEntries = append(u.drandBlockEntries, blocks.NewDrandBlockEntries(bh)...) -} - -func (u *UnindexedBlockData) Has(bh *types.BlockHeader) bool { - _, has := u.has[bh.Cid()] - return has -} - -func (u *UnindexedBlockData) Persist(ctx context.Context, db *pg.DB) error { - ctx, span := global.Tracer("").Start(ctx, "Indexer.PersistBlockData") - defer span.End() - - return db.RunInTransaction(ctx, func(tx *pg.Tx) error { - log.Infow("Persist unindexed block data", "count", u.Size()) - grp, ctx := errgroup.WithContext(ctx) - - grp.Go(func() error { - if err := u.blks.PersistWithTx(ctx, tx); err != nil { - return xerrors.Errorf("persist block headers: %w", err) - } - return nil - }) - - grp.Go(func() error { - if err := u.synced.PersistWithTx(ctx, tx); err != nil { - return xerrors.Errorf("persist blocks synced: %w", err) - } - return nil - }) - - grp.Go(func() error { - if err := u.parents.PersistWithTx(ctx, tx); err != nil { - return xerrors.Errorf("persist block parents: %w", err) - } - return nil - }) - - grp.Go(func() error { - if err := u.drandEntries.PersistWithTx(ctx, tx); err != nil { - return xerrors.Errorf("persist drand entries: %w", err) - } - return nil - }) - - grp.Go(func() error { - if err := u.drandBlockEntries.PersistWithTx(ctx, tx); err != nil { - return xerrors.Errorf("persist drand block entries: %w", err) - } - return nil - }) - - if err := grp.Wait(); err != nil { - log.Info("Rolling back unindexed block data", "error", err) - return err - } - - log.Info("Committing unindexed block data") - return nil - }) -} - -func (u *UnindexedBlockData) Size() int { - return len(u.has) -} diff --git a/services/processor/processor.go b/services/processor/processor.go deleted file mode 100644 index e3b45a5dd..000000000 --- a/services/processor/processor.go +++ /dev/null @@ -1,213 +0,0 @@ -package processor - -import ( - "context" - "strings" - "sync" - "time" - - "github.com/gocraft/work" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - "go.opentelemetry.io/otel/api/trace" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - types "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/lib/parmap" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - "github.com/filecoin-project/sentinel-visor/services/indexer" - "github.com/filecoin-project/sentinel-visor/storage" -) - -func NewProcessor(db *storage.Database, n lens.API) *Processor { - // TODO I don't like how these are buried in here. - pubCh := make(chan model.Persistable) - p := NewPublisher(db, pubCh) - s := NewScheduler(n, pubCh) - return &Processor{ - storage: db, - node: n, - scheduler: s, - publisher: p, - log: logging.Logger("processor"), - } -} - -type Processor struct { - storage *storage.Database - node lens.API - - scheduler *Scheduler - publisher *Publisher - - log *logging.ZapEventLogger - tracer trace.Tracer - - // we will want to spcial case the processing of the genesis state. - genesis *types.TipSet - - batchSize int - - pool *work.WorkerPool - tipsetQueue *work.Enqueuer -} - -func (p *Processor) InitHandler(ctx context.Context, batchSize int) error { - p.publisher.Start(ctx) - p.scheduler.Start() - - gen, err := p.node.ChainGetGenesis(ctx) - if err != nil { - return xerrors.Errorf("get genesis: %w", err) - } - - p.genesis = gen - p.batchSize = batchSize - - if _, err := p.scheduler.queueGenesisTask(gen.Key(), gen.ParentState()); err != nil { - return xerrors.Errorf("queue genesis task: %w", err) - } - - p.log.Infow("initialized processor", "genesis", gen.String()) - return nil -} - -func (p *Processor) Start(ctx context.Context) error { - p.log.Info("starting processor") - // Ensure the scheduler stops the workers and associated processes before exiting. - defer p.scheduler.Stop() - for { - select { - case <-ctx.Done(): - p.log.Info("stopping processor") - return nil - default: - err := p.process(ctx) - if err != nil { - return xerrors.Errorf("process: %w", err) - } - } - } -} - -func (p *Processor) process(ctx context.Context) error { - ctx, span := global.Tracer("").Start(ctx, "Processor.process") - defer span.End() - - blksToProcess, err := p.collectBlocksToProcess(ctx, p.batchSize) - if err != nil { - return xerrors.Errorf("collect blocks: %w", err) - } - - if len(blksToProcess) == 0 { - p.log.Info("no blocks to process, waiting 30 seconds") - time.Sleep(time.Second * 30) - return nil - } - p.log.Infow("collected blocks for processing", "count", len(blksToProcess)) - - actorChanges, err := p.collectActorChanges(ctx, blksToProcess) - if err != nil { - return xerrors.Errorf("collect actor changes: %w", err) - } - - p.log.Infow("collected actor changes") - - if err := p.scheduler.Dispatch(actorChanges); err != nil { - return xerrors.Errorf("dispatch: %w", err) - } - - return nil -} - -func (p *Processor) collectActorChanges(ctx context.Context, blks []*types.BlockHeader) (map[types.TipSetKey][]indexer.ActorInfo, error) { - - out := make(map[types.TipSetKey][]indexer.ActorInfo) - var outMu sync.Mutex - parmap.Par(25, blks, func(blk *types.BlockHeader) { - ctx, span := global.Tracer("").Start(ctx, "Processor.collectActorChanges") - defer span.End() - - pts, err := p.node.ChainGetTipSet(ctx, types.NewTipSetKey(blk.Parents...)) - if err != nil { - p.log.Error(err) - return - } - - changes, err := p.node.StateChangedActors(ctx, pts.ParentState(), blk.ParentStateRoot) - if err != nil { - p.log.Error(err) - return - } - - for str, act := range changes { - addr, err := address.NewFromString(str) - if err != nil { - p.log.Error(err) - continue - } - - _, err = p.node.StateGetActor(ctx, addr, pts.Key()) - if err != nil { - if strings.Contains(err.Error(), "actor not found") { - // TODO consider tracking deleted actors - continue - } - p.log.Error(err) - return - } - - _, err = p.node.StateGetActor(ctx, addr, pts.Parents()) - if err != nil { - if strings.Contains(err.Error(), "actor not found") { - // TODO consider tracking deleted actors - continue - } - p.log.Error(err) - return - } - - // TODO track null rounds - outMu.Lock() - out[pts.Key()] = append(out[pts.Key()], indexer.ActorInfo{ - Actor: act, - Address: addr, - TipSet: pts.Key(), - ParentTipSet: pts.Parents(), - ParentStateRoot: pts.ParentState(), - }) - outMu.Unlock() - } - - }) - return out, nil -} - -func (p *Processor) collectBlocksToProcess(ctx context.Context, batch int) ([]*types.BlockHeader, error) { - ctx, span := global.Tracer("").Start(ctx, "Processor.collectBlocksToProcess") - defer span.End() - - blks, err := p.storage.CollectAndMarkBlocksAsProcessing(ctx, batch) - if err != nil { - return nil, err - } - - out := make([]*types.BlockHeader, len(blks)) - for idx, blk := range blks { - blkCid, err := cid.Decode(blk.Cid) - if err != nil { - return nil, xerrors.Errorf("decode cid: %w", err) - } - - header, err := p.node.ChainGetBlock(ctx, blkCid) - if err != nil { - return nil, xerrors.Errorf("get block: %w", err) - } - out[idx] = header - } - return out, nil -} diff --git a/services/processor/publisher.go b/services/processor/publisher.go deleted file mode 100644 index 2e64d8f4d..000000000 --- a/services/processor/publisher.go +++ /dev/null @@ -1,45 +0,0 @@ -package processor - -import ( - "context" - - logging "github.com/ipfs/go-log/v2" - - "github.com/filecoin-project/sentinel-visor/model" - "github.com/filecoin-project/sentinel-visor/storage" -) - -func NewPublisher(s *storage.Database, pubCh <-chan model.Persistable) *Publisher { - return &Publisher{ - storage: s, - pubCh: pubCh, - log: logging.Logger("publisher"), - } -} - -type Publisher struct { - storage *storage.Database - pubCh <-chan model.Persistable - log *logging.ZapEventLogger -} - -func (p *Publisher) Start(ctx context.Context) { - p.log.Info("starting publisher") - go func() { - for { - select { - case <-ctx.Done(): - p.log.Info("stopping publisher") - return - case persistable := <-p.pubCh: - go func() { - if err := persistable.Persist(ctx, p.storage.DB); err != nil { - // TODO handle this case with a retry - p.log.Error("persisting", "error", err.Error()) - } - }() - } - } - }() - -} diff --git a/services/processor/scheduler.go b/services/processor/scheduler.go deleted file mode 100644 index 7e9eba92d..000000000 --- a/services/processor/scheduler.go +++ /dev/null @@ -1,341 +0,0 @@ -package processor - -import ( - "os" - "strconv" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - "github.com/filecoin-project/sentinel-visor/services/indexer" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/common" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/genesis" - init_ "github.com/filecoin-project/sentinel-visor/services/processor/tasks/init" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/market" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/message" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/miner" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/power" - "github.com/filecoin-project/sentinel-visor/services/processor/tasks/reward" -) - -var log = logging.Logger("scheduler") - -const ( - EnvRedisMaxActive = "VISOR_REDIS_MAX_ACTIVE" - EnvRedisMaxIdle = "VISOR_REDIS_MAX_IDLE" - EnvRedisNetwork = "VISOR_REDIS_NETWORK" - EnvRedisAddress = "VISOR_REDIS_ADDRESS" -) - -var ( - RedisMaxActive int64 - RedisMaxIdle int64 - RedisNetwork string - RedisAddress string -) - -func init() { - RedisMaxActive = 128 - RedisMaxIdle = 128 - RedisNetwork = "tcp" - RedisAddress = ":6379" - - if maxActiveStr := os.Getenv(EnvRedisMaxActive); maxActiveStr != "" { - max, err := strconv.ParseInt(maxActiveStr, 10, 64) - if err != nil { - log.Errorw("setting redis max active", "error", err) - } else { - RedisMaxActive = max - } - } - - if maxIdlStr := os.Getenv(EnvRedisMaxIdle); maxIdlStr != "" { - max, err := strconv.ParseInt(maxIdlStr, 10, 64) - if err != nil { - log.Errorw("setting redis max idle", "error", err) - } else { - RedisMaxIdle = max - } - } - - if network := os.Getenv(EnvRedisNetwork); network != "" { - RedisNetwork = network - } - - if address := os.Getenv(EnvRedisAddress); address != "" { - RedisAddress = address - } -} - -const ( - GenesisTaskName = "process_genesis" - GenesisPoolName = "genesis_tasks" - - InitActorTaskName = "process_init_actor" - InitActorPoolName = "init_actor_tasks" - - MinerTaskName = "process_miner" - MinerPoolName = "miner_actor_tasks" - - MarketTaskName = "process_market" - MarketPoolName = "market_actor_tasks" - - MessageTaskName = "process_message" - MessagePoolName = "message_tasks" - - PowerTaskName = "process_power" - PowerPoolName = "power_actor_tasks" - - RewardTaskName = "process_reward" - RewardPoolName = "reward_tasks" - - CommonTaskName = "process_common" - CommonPoolName = "common_actor_tasks" -) - -func NewScheduler(node lens.API, pubCh chan<- model.Persistable) *Scheduler { - // Make a redis pool - var redisPool = &redis.Pool{ - MaxActive: int(RedisMaxActive), - MaxIdle: int(RedisMaxIdle), - Wait: true, - Dial: func() (redis.Conn, error) { - return redis.Dial(RedisNetwork, RedisAddress) - }, - } - - genesisPool, genesisQueue := genesis.Setup(1, GenesisTaskName, GenesisPoolName, redisPool, node, pubCh) - minerPool, minerQueue := miner.Setup(64, MinerTaskName, MinerPoolName, redisPool, node, pubCh) - marketPool, marketQueue := market.Setup(64, MarketTaskName, MarketPoolName, redisPool, node, pubCh) - msgPool, msgQueue := message.Setup(64, MessageTaskName, MessagePoolName, redisPool, node, pubCh) - powerPool, powerQueue := power.Setup(64, PowerTaskName, PowerPoolName, redisPool, node, pubCh) - rwdPool, rwdQueue := reward.Setup(4, RewardTaskName, RewardPoolName, redisPool, node, pubCh) - comPool, comQueue := common.Setup(64, CommonTaskName, CommonPoolName, redisPool, node, pubCh) - initPool, initQueue := init_.Setup(64, InitActorTaskName, InitActorPoolName, redisPool, node, pubCh) - - pools := []*work.WorkerPool{genesisPool, minerPool, marketPool, powerPool, msgPool, rwdPool, comPool, initPool} - - queues := map[string]*work.Enqueuer{ - GenesisTaskName: genesisQueue, - MinerTaskName: minerQueue, - MarketTaskName: marketQueue, - MessageTaskName: msgQueue, - PowerTaskName: powerQueue, - RewardTaskName: rwdQueue, - CommonTaskName: comQueue, - InitActorTaskName: initQueue, - } - - return &Scheduler{ - pools: pools, - queues: queues, - } -} - -type Scheduler struct { - pools []*work.WorkerPool - queues map[string]*work.Enqueuer -} - -func (s *Scheduler) Start() { - for _, pool := range s.pools { - pool.Start() - } -} - -func (s *Scheduler) Stop() { - for _, pool := range s.pools { - // pool.Drain() - // TODO wait for pools to drain before stopping them, will require coordination with the - // processor such that no more tasks are added else Drain will never return. - pool.Stop() - } -} - -func (s *Scheduler) Dispatch(tips indexer.ActorTips) error { - for ts, actors := range tips { - // TODO this is pretty ugly, need some local cache of tsKey to ts in here somewhere. - _, err := s.queueMessageTask(ts, actors[0].ParentStateRoot) - if err != nil { - return xerrors.Errorf("queue message task: %w", err) - } - for _, actor := range actors { - _, err := s.queueCommonTask(actor) - if err != nil { - return xerrors.Errorf("queue common task: %w", err) - } - switch actor.Actor.Code { - case builtin.InitActorCodeID: - _, err := s.queueInitTask(actor) - if err != nil { - return xerrors.Errorf("queue init task: %w", err) - } - case builtin.StorageMinerActorCodeID: - _, err := s.queueMinerTask(actor) - if err != nil { - return xerrors.Errorf("queue miner task: %w", err) - } - case builtin.StorageMarketActorCodeID: - _, err := s.queueMarketTask(actor) - if err != nil { - return xerrors.Errorf("queue market task: %w", err) - } - case builtin.StoragePowerActorCodeID: - _, err := s.queuePowerTask(actor) - if err != nil { - return xerrors.Errorf("queue power task: %w", err) - } - case builtin.RewardActorCodeID: - _, err := s.queueRewardTask(actor) - if err != nil { - return xerrors.Errorf("queue reward task: %w", err) - } - } - } - } - return nil -} - -func (s *Scheduler) queueMinerTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal parent tipset key: %w", err) - } - return s.queues[MinerTaskName].Enqueue(MinerTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "head": info.Actor.Head.String(), - "address": info.Address.String(), - "stateroot": info.ParentStateRoot.String(), - }) -} - -func (s *Scheduler) queuePowerTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal parent tipset key: %w", err) - } - return s.queues[PowerTaskName].Enqueue(PowerTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "head": info.Actor.Head.String(), - "address": info.Address.String(), - "stateroot": info.ParentStateRoot.String(), - }) - -} - -func (s *Scheduler) queueMarketTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal parent tipset key: %w", err) - } - return s.queues[MarketTaskName].Enqueue(MarketTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "head": info.Actor.Head.String(), - "address": info.Address.String(), - "stateroot": info.ParentStateRoot.String(), - }) -} - -func (s *Scheduler) queueInitTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal parent tipset key: %w", err) - } - return s.queues[InitActorTaskName].Enqueue(InitActorTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "head": info.Actor.Head.String(), - "address": info.Address.String(), - "stateroot": info.ParentStateRoot.String(), - }) - -} - -func (s *Scheduler) queueGenesisTask(genesisTs types.TipSetKey, genesisRoot cid.Cid) (*work.Job, error) { - tsB, err := genesisTs.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - return s.queues[GenesisTaskName].EnqueueUnique(GenesisTaskName, work.Q{ - "ts": string(tsB), - "stateroot": genesisRoot.String(), - }) -} - -func (s *Scheduler) queueMessageTask(ts types.TipSetKey, st cid.Cid) (*work.Job, error) { - tsB, err := ts.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - return s.queues[MessageTaskName].Enqueue(MessageTaskName, work.Q{ - "ts": string(tsB), - "stateroot": st.String(), - }) -} - -func (s *Scheduler) queueRewardTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, err - } - - return s.queues[RewardTaskName].Enqueue(RewardTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "head": info.Actor.Head.String(), - "stateroot": info.ParentStateRoot.String(), - }) -} - -func (s *Scheduler) queueCommonTask(info indexer.ActorInfo) (*work.Job, error) { - tsB, err := info.TipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal tipset key: %w", err) - } - ptsB, err := info.ParentTipSet.MarshalJSON() - if err != nil { - return nil, xerrors.Errorf("marshal parent tipset key: %w", err) - } - - return s.queues[CommonTaskName].Enqueue(CommonTaskName, work.Q{ - "ts": string(tsB), - "pts": string(ptsB), - "stateroot": info.ParentStateRoot.String(), - "address": info.Address.String(), - "head": info.Actor.Head.String(), - "code": info.Actor.Code.String(), - "balance": info.Actor.Balance.String(), - "nonce": strconv.FormatUint(info.Actor.Nonce, 10), - }) -} diff --git a/services/processor/tasks/common/actor.go b/services/processor/tasks/common/actor.go deleted file mode 100644 index b4445a770..000000000 --- a/services/processor/tasks/common/actor.go +++ /dev/null @@ -1,197 +0,0 @@ -package common - -import ( - "context" - "encoding/json" - "strconv" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/big" - lapi "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - commonmodel "github.com/filecoin-project/sentinel-visor/model/actors/common" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessActorTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessActorTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("commonactortask") - return next() - }) - // log all task - pool.Middleware((*ProcessActorTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessActorTask).Task) - - return pool, queue -} - -type ProcessActorTask struct { - node lapi.FullNode - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - tsKey types.TipSetKey - ptsKey types.TipSetKey - stateroot cid.Cid - addr address.Address - head cid.Cid - code cid.Cid - balance big.Int - nonce uint64 -} - -func (p *ProcessActorTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("starting common actor task", "job", job.ID, "args", job.Args) - return next() -} - -func (p *ProcessActorTask) ParseArgs(job *work.Job) error { - // this needs a better pattern.... - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - ptsStr := job.ArgString("pts") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - addrStr := job.ArgString("address") - if err := job.ArgError(); err != nil { - return err - } - - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - codeStr := job.ArgString("code") - if err := job.ArgError(); err != nil { - return err - } - - balStr := job.ArgString("balance") - if err := job.ArgError(); err != nil { - return err - } - - nonceStr := job.ArgString("nonce") - if err := job.ArgError(); err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - var ptsKey types.TipSetKey - if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { - return err - } - - stateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - addr, err := address.NewFromString(addrStr) - if err != nil { - return err - } - - head, err := cid.Decode(headStr) - if err != nil { - return err - } - - code, err := cid.Decode(codeStr) - if err != nil { - return err - } - - balance, err := big.FromString(balStr) - if err != nil { - return err - } - - nonce, err := strconv.ParseUint(nonceStr, 10, 64) - if err != nil { - return err - } - - p.tsKey = tsKey - p.ptsKey = ptsKey - p.stateroot = stateroot - p.addr = addr - p.head = head - p.code = code - p.balance = balance - p.nonce = nonce - return nil -} - -func (p *ProcessActorTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessActorTask.Task") - defer span.End() - - ast, err := p.node.StateReadState(ctx, p.addr, p.tsKey) - if err != nil { - return err - } - - state, err := json.Marshal(ast.State) - if err != nil { - return err - } - - p.pubCh <- &commonmodel.ActorTaskResult{ - Actor: &commonmodel.Actor{ - ID: p.addr.String(), - StateRoot: p.stateroot.String(), - Code: builtin.ActorNameByCode(p.code), - Head: p.head.String(), - Balance: p.balance.String(), - Nonce: p.nonce, - }, - State: &commonmodel.ActorState{ - Head: p.head.String(), - Code: p.code.String(), - State: string(state), - }, - } - return nil -} diff --git a/services/processor/tasks/genesis/genesis.go b/services/processor/tasks/genesis/genesis.go deleted file mode 100644 index 34eaffef3..000000000 --- a/services/processor/tasks/genesis/genesis.go +++ /dev/null @@ -1,287 +0,0 @@ -package genesis - -import ( - "context" - "strconv" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/go-state-types/abi" - init_ "github.com/filecoin-project/lotus/chain/actors/builtin/init" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - initmodel "github.com/filecoin-project/sentinel-visor/model/actors/init" - marketmodel "github.com/filecoin-project/sentinel-visor/model/actors/market" - minermodel "github.com/filecoin-project/sentinel-visor/model/actors/miner" - genesismodel "github.com/filecoin-project/sentinel-visor/model/genesis" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessGenesisSingletonTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessGenesisSingletonTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("genesistask") - return next() - }) - // log all task - pool.Middleware((*ProcessGenesisSingletonTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessGenesisSingletonTask).Task) - - return pool, queue -} - -type ProcessGenesisSingletonTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - genesis types.TipSetKey - stateroot cid.Cid -} - -func (p *ProcessGenesisSingletonTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("Starting Job", "name", job.Name, "Args", job.Args) - return next() -} - -func (p *ProcessGenesisSingletonTask) ParseArgs(job *work.Job) error { - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - sr, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - p.genesis = tsKey - p.stateroot = sr - return nil -} - -func (p *ProcessGenesisSingletonTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessGenesisSingletonTask.Task") - defer span.End() - - genesisAddrs, err := p.node.StateListActors(ctx, p.genesis) - if err != nil { - return err - } - - result := &genesismodel.ProcessGenesisSingletonResult{} - for _, addr := range genesisAddrs { - genesisAct, err := p.node.StateGetActor(ctx, addr, p.genesis) - if err != nil { - return err - } - switch genesisAct.Code { - case builtin.SystemActorCodeID: - // TODO - case builtin.InitActorCodeID: - res, err := p.initActorState(genesisAct) - if err != nil { - return err - } - result.SetInitActor(res) - case builtin.CronActorCodeID: - // TODO - case builtin.AccountActorCodeID: - // TODO - case builtin.StoragePowerActorCodeID: - // TODO - case builtin.StorageMarketActorCodeID: - res, err := p.storageMarketState(ctx) - if err != nil { - return err - } - result.SetMarket(res) - case builtin.StorageMinerActorCodeID: - res, err := p.storageMinerState(ctx, addr, genesisAct) - if err != nil { - return err - } - result.AddMiner(res) - case builtin.PaymentChannelActorCodeID: - // TODO - case builtin.MultisigActorCodeID: - // TODO - case builtin.RewardActorCodeID: - // TODO - case builtin.VerifiedRegistryActorCodeID: - // TODO - default: - p.log.Warnf("unknown actor in genesis state. address: %s code: %s head: %s", addr, genesisAct.Code, genesisAct.Head) - } - } - p.pubCh <- result - return nil -} - -func (p *ProcessGenesisSingletonTask) storageMinerState(ctx context.Context, addr address.Address, act *types.Actor) (*genesismodel.GenesisMinerTaskResult, error) { - // actual miner actor state and miner info - mstate, err := miner.Load(p.node.Store(), act) - if err != nil { - return nil, err - } - - // miner raw and qual power - // TODO this needs caching so we don't re-fetch the power actors claim table for every tipset. - mpower, err := p.node.StateMinerPower(ctx, addr, p.genesis) - if err != nil { - return nil, err - } - - msectors, err := mstate.LoadSectors(nil) - if err != nil { - return nil, err - } - - minfo, err := mstate.Info() - if err != nil { - return nil, err - } - - powerModel := &minermodel.MinerPower{ - MinerID: addr.String(), - StateRoot: p.stateroot.String(), - RawBytePower: mpower.MinerPower.RawBytePower.String(), - QualityAdjustedPower: mpower.MinerPower.QualityAdjPower.String(), - } - - stateModel := &minermodel.MinerState{ - MinerID: addr.String(), - OwnerID: minfo.Owner.String(), - WorkerID: minfo.Worker.String(), - PeerID: minfo.PeerId.String(), - SectorSize: minfo.SectorSize.ShortString(), - } - - sectorsModel := make(minermodel.MinerSectorInfos, len(msectors)) - dealsModel := minermodel.MinerDealSectors{} - for idx, sector := range msectors { - sectorsModel[idx] = &minermodel.MinerSectorInfo{ - MinerID: addr.String(), - SectorID: uint64(sector.SectorNumber), - StateRoot: p.stateroot.String(), - SealedCID: sector.SealedCID.String(), - ActivationEpoch: int64(sector.Activation), - ExpirationEpoch: int64(sector.Expiration), - DealWeight: sector.DealWeight.String(), - VerifiedDealWeight: sector.VerifiedDealWeight.String(), - InitialPledge: sector.InitialPledge.String(), - ExpectedDayReward: sector.ExpectedDayReward.String(), - ExpectedStoragePledge: sector.ExpectedStoragePledge.String(), - } - for _, dealID := range sector.DealIDs { - dealsModel = append(dealsModel, &minermodel.MinerDealSector{ - MinerID: addr.String(), - SectorID: uint64(sector.SectorNumber), - DealID: uint64(dealID), - }) - } - } - return &genesismodel.GenesisMinerTaskResult{ - StateModel: stateModel, - PowerModel: powerModel, - SectorModels: sectorsModel, - DealModels: dealsModel, - }, nil -} - -func (p *ProcessGenesisSingletonTask) initActorState(act *types.Actor) (*genesismodel.GenesisInitActorTaskResult, error) { - initActorState, err := init_.Load(p.node.Store(), act) - if err != nil { - return nil, err - } - - out := initmodel.IdAddressList{} - if err := initActorState.ForEachActor(func(id abi.ActorID, addr address.Address) error { - out = append(out, &initmodel.IdAddress{ - ID: id.String(), - Address: addr.String(), - StateRoot: p.stateroot.String(), - }) - return nil - }); err != nil { - return nil, err - } - return &genesismodel.GenesisInitActorTaskResult{AddressMap: out}, nil -} - -func (p *ProcessGenesisSingletonTask) storageMarketState(ctx context.Context) (*genesismodel.GenesisMarketTaskResult, error) { - dealStates, err := p.node.StateMarketDeals(ctx, p.genesis) - if err != nil { - return nil, err - } - - states := make(marketmodel.MarketDealStates, len(dealStates)) - proposals := make(marketmodel.MarketDealProposals, len(dealStates)) - idx := 0 - for idStr, deal := range dealStates { - dealID, err := strconv.ParseUint(idStr, 10, 64) - if err != nil { - return nil, err - } - states[idx] = &marketmodel.MarketDealState{ - DealID: dealID, - SectorStartEpoch: int64(deal.State.SectorStartEpoch), - LastUpdateEpoch: int64(deal.State.LastUpdatedEpoch), - SlashEpoch: int64(deal.State.SlashEpoch), - StateRoot: p.stateroot.String(), - } - proposals[idx] = &marketmodel.MarketDealProposal{ - DealID: dealID, - StateRoot: p.stateroot.String(), - PaddedPieceSize: uint64(deal.Proposal.PieceSize), - UnpaddedPieceSize: uint64(deal.Proposal.PieceSize.Unpadded()), - StartEpoch: int64(deal.Proposal.StartEpoch), - EndEpoch: int64(deal.Proposal.EndEpoch), - ClientID: deal.Proposal.Client.String(), - ProviderID: deal.Proposal.Provider.String(), - ClientCollateral: deal.Proposal.ClientCollateral.String(), - ProviderCollateral: deal.Proposal.ProviderCollateral.String(), - StoragePricePerEpoch: deal.Proposal.StoragePricePerEpoch.String(), - PieceCID: deal.Proposal.PieceCID.String(), - IsVerified: deal.Proposal.VerifiedDeal, - Label: deal.Proposal.Label, - } - idx++ - } - return &genesismodel.GenesisMarketTaskResult{ - states, - proposals, - }, nil -} diff --git a/services/processor/tasks/init/init_actor.go b/services/processor/tasks/init/init_actor.go deleted file mode 100644 index a16a900a9..000000000 --- a/services/processor/tasks/init/init_actor.go +++ /dev/null @@ -1,149 +0,0 @@ -package init - -import ( - "context" - "fmt" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - initmodel "github.com/filecoin-project/sentinel-visor/model/actors/init" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessInitActorTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessInitActorTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("markettask") - return next() - }) - // log all task - pool.Middleware((*ProcessInitActorTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessInitActorTask).Task) - - return pool, queue -} - -type ProcessInitActorTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - head cid.Cid - stateroot cid.Cid - tsKey types.TipSetKey - ptsKey types.TipSetKey -} - -func (p *ProcessInitActorTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("starting init actor task", "job", job.ID, "args", job.Args) - return next() -} - -func (p *ProcessInitActorTask) ParseArgs(job *work.Job) error { - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - ptsStr := job.ArgString("pts") - if err := job.ArgError(); err != nil { - return err - } - - mhead, err := cid.Decode(headStr) - if err != nil { - return err - } - - mstateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - var ptsKey types.TipSetKey - if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { - return err - } - - p.head = mhead - p.tsKey = tsKey - p.ptsKey = ptsKey - p.stateroot = mstateroot - return nil -} - -func (p *ProcessInitActorTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessInitActorTask.Task") - defer span.End() - - pred := state.NewStatePredicates(p.node) - stateDiff := pred.OnInitActorChange(pred.OnAddressMapChange()) - changed, val, err := stateDiff(ctx, p.ptsKey, p.tsKey) - if err != nil { - return err - } - if !changed { - return err - } - changes, ok := val.(*state.InitActorAddressChanges) - if !ok { - return fmt.Errorf("unknown type returned by init acotr hamt predicate: %T", val) - } - - out := make(initmodel.IdAddressList, len(changes.Added)+len(changes.Modified)) - for idx, add := range changes.Added { - out[idx] = &initmodel.IdAddress{ - ID: add.ID.String(), - Address: add.PK.String(), - StateRoot: p.stateroot.String(), - } - } - for idx, mod := range changes.Modified { - out[idx] = &initmodel.IdAddress{ - ID: mod.To.ID.String(), - Address: mod.To.PK.String(), - StateRoot: p.stateroot.String(), - } - } - p.pubCh <- out - return nil -} diff --git a/services/processor/tasks/interface.go b/services/processor/tasks/interface.go deleted file mode 100644 index 2a79e677c..000000000 --- a/services/processor/tasks/interface.go +++ /dev/null @@ -1,9 +0,0 @@ -package tasks - -import "github.com/gocraft/work" - -type ProcessTask interface { - Log(job *work.Job, next work.NextMiddlewareFunc) error - ParseArgs(job *work.Job) error - Task(job *work.Job) error -} diff --git a/services/processor/tasks/market/market.go b/services/processor/tasks/market/market.go deleted file mode 100644 index ea0b7ee00..000000000 --- a/services/processor/tasks/market/market.go +++ /dev/null @@ -1,215 +0,0 @@ -package market - -import ( - "context" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - "golang.org/x/xerrors" - - market "github.com/filecoin-project/lotus/chain/actors/builtin/market" - "github.com/filecoin-project/lotus/chain/events/state" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - marketmodel "github.com/filecoin-project/sentinel-visor/model/actors/market" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessMarketTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessMarketTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("markettask") - return next() - }) - // log all task - pool.Middleware((*ProcessMarketTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessMarketTask).Task) - - return pool, queue -} - -type ProcessMarketTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - head cid.Cid - stateroot cid.Cid - tsKey types.TipSetKey - ptsKey types.TipSetKey -} - -func (pmt *ProcessMarketTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - pmt.log.Infow("Starting Market Task", "name", job.Name, "Args", job.Args) - return next() -} - -func (pmt *ProcessMarketTask) ParseArgs(job *work.Job) error { - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - ptsStr := job.ArgString("pts") - if err := job.ArgError(); err != nil { - return err - } - - mhead, err := cid.Decode(headStr) - if err != nil { - return err - } - - mstateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - var ptsKey types.TipSetKey - if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { - return err - } - - pmt.head = mhead - pmt.tsKey = tsKey - pmt.ptsKey = ptsKey - pmt.stateroot = mstateroot - return nil -} - -func (pmt *ProcessMarketTask) Task(job *work.Job) error { - if err := pmt.ParseArgs(job); err != nil { - return err - } - - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessMarketTask.Task") - defer span.End() - - proposals, err := pmt.marketDealProposalChanges(ctx) - if err != nil { - return err - } - - states, err := pmt.marketDealStateChanges(ctx) - if err != nil { - return err - } - - pmt.pubCh <- &marketmodel.MarketTaskResult{ - Proposals: proposals, - States: states, - } - - return nil -} - -func (pmt *ProcessMarketTask) marketDealStateChanges(ctx context.Context) (marketmodel.MarketDealStates, error) { - pred := state.NewStatePredicates(pmt.node) - stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealStateChanged(pred.OnDealStateAmtChanged())) - changed, val, err := stateDiff(ctx, pmt.ptsKey, pmt.tsKey) - if err != nil { - return nil, err - } - if !changed { - return nil, nil - } - changes, ok := val.(*market.DealStateChanges) - if !ok { - // indicates a developer error or breaking change in lotus - return nil, xerrors.Errorf("Unknown type returned by Deal State AMT predicate: %T", val) - } - - out := make(marketmodel.MarketDealStates, len(changes.Added)+len(changes.Modified)) - idx := 0 - for _, add := range changes.Added { - out[idx] = &marketmodel.MarketDealState{ - DealID: uint64(add.ID), - StateRoot: pmt.stateroot.String(), - SectorStartEpoch: int64(add.Deal.SectorStartEpoch), - LastUpdateEpoch: int64(add.Deal.LastUpdatedEpoch), - SlashEpoch: int64(add.Deal.SlashEpoch), - } - idx++ - } - for _, mod := range changes.Modified { - out[idx] = &marketmodel.MarketDealState{ - DealID: uint64(mod.ID), - SectorStartEpoch: int64(mod.To.SectorStartEpoch), - LastUpdateEpoch: int64(mod.To.LastUpdatedEpoch), - SlashEpoch: int64(mod.To.SlashEpoch), - StateRoot: pmt.stateroot.String(), - } - idx++ - } - return out, nil -} - -func (pmt *ProcessMarketTask) marketDealProposalChanges(ctx context.Context) (marketmodel.MarketDealProposals, error) { - pred := state.NewStatePredicates(pmt.node) - stateDiff := pred.OnStorageMarketActorChanged(pred.OnDealProposalChanged(pred.OnDealProposalAmtChanged())) - changed, val, err := stateDiff(ctx, pmt.ptsKey, pmt.tsKey) - if err != nil { - return nil, err - } - if !changed { - return nil, nil - } - changes, ok := val.(*market.DealProposalChanges) - if !ok { - // indicates a developer error or breaking change in lotus - return nil, xerrors.Errorf("Unknown type returned by Deal Proposal AMT predicate: %T", val) - } - - out := make(marketmodel.MarketDealProposals, len(changes.Added)) - - for idx, add := range changes.Added { - out[idx] = &marketmodel.MarketDealProposal{ - DealID: uint64(add.ID), - StateRoot: pmt.stateroot.String(), - PaddedPieceSize: uint64(add.Proposal.PieceSize), - UnpaddedPieceSize: uint64(add.Proposal.PieceSize.Unpadded()), - StartEpoch: int64(add.Proposal.StartEpoch), - EndEpoch: int64(add.Proposal.EndEpoch), - ClientID: add.Proposal.Client.String(), - ProviderID: add.Proposal.Provider.String(), - ClientCollateral: add.Proposal.ClientCollateral.String(), - ProviderCollateral: add.Proposal.ProviderCollateral.String(), - StoragePricePerEpoch: add.Proposal.StoragePricePerEpoch.String(), - PieceCID: add.Proposal.PieceCID.String(), - IsVerified: add.Proposal.VerifiedDeal, - Label: add.Proposal.Label, - } - } - return out, nil -} diff --git a/services/processor/tasks/message/message.go b/services/processor/tasks/message/message.go deleted file mode 100644 index 1c10e0301..000000000 --- a/services/processor/tasks/message/message.go +++ /dev/null @@ -1,190 +0,0 @@ -package message - -import ( - "context" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - messagemodel "github.com/filecoin-project/sentinel-visor/model/messages" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessMessageTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessMessageTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("messagetask") - return next() - }) - // log all task - pool.Middleware((*ProcessMessageTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessMessageTask).Task) - - return pool, queue -} - -type ProcessMessageTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - ts types.TipSetKey - stateroot cid.Cid -} - -func (pm *ProcessMessageTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - pm.log.Infow("starting message task", "job", job.ID, "args", job.Args) - return next() -} - -func (pm *ProcessMessageTask) ParseArgs(job *work.Job) error { - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - stateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - pm.ts = tsKey - pm.stateroot = stateroot - return nil -} - -func (pm *ProcessMessageTask) Task(job *work.Job) error { - if err := pm.ParseArgs(job); err != nil { - return err - } - - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessMessageTask.Task") - defer span.End() - - msgs, blkMsgs, err := pm.fetchMessages(ctx) - if err != nil { - return err - } - - rcts, err := pm.fetchReceipts(ctx) - if err != nil { - return err - } - - pm.pubCh <- &messagemodel.MessageTaskResult{ - Messages: msgs, - BlockMessages: blkMsgs, - Receipts: rcts, - } - return nil -} - -func (pm *ProcessMessageTask) fetchMessages(ctx context.Context) (messagemodel.Messages, messagemodel.BlockMessages, error) { - msgs := messagemodel.Messages{} - bmsgs := messagemodel.BlockMessages{} - msgsSeen := map[cid.Cid]struct{}{} - - // TODO consider performing this work in parallel. - for _, blk := range pm.ts.Cids() { - blkMsgs, err := pm.node.ChainGetBlockMessages(ctx, blk) - if err != nil { - return nil, nil, err - } - - vmm := make([]*types.Message, 0, len(blkMsgs.Cids)) - for _, m := range blkMsgs.BlsMessages { - vmm = append(vmm, m) - } - - for _, m := range blkMsgs.SecpkMessages { - vmm = append(vmm, &m.Message) - } - - for _, message := range vmm { - bmsgs = append(bmsgs, &messagemodel.BlockMessage{ - Block: blk.String(), - Message: message.Cid().String(), - }) - - // so we don't create duplicate message models. - if _, seen := msgsSeen[message.Cid()]; seen { - continue - } - - var msgSize int - if b, err := message.Serialize(); err == nil { - msgSize = len(b) - } - msgs = append(msgs, &messagemodel.Message{ - Cid: message.Cid().String(), - From: message.From.String(), - To: message.To.String(), - Value: message.Value.String(), - GasFeeCap: message.GasFeeCap.String(), - GasPremium: message.GasPremium.String(), - GasLimit: message.GasLimit, - SizeBytes: msgSize, - Nonce: message.Nonce, - Method: uint64(message.Method), - Params: message.Params, - }) - msgsSeen[message.Cid()] = struct{}{} - } - } - return msgs, bmsgs, nil -} - -func (pm *ProcessMessageTask) fetchReceipts(ctx context.Context) (messagemodel.Receipts, error) { - out := messagemodel.Receipts{} - - for _, blk := range pm.ts.Cids() { - recs, err := pm.node.ChainGetParentReceipts(ctx, blk) - if err != nil { - return nil, err - } - msgs, err := pm.node.ChainGetParentMessages(ctx, blk) - if err != nil { - return nil, err - } - - for i, r := range recs { - out = append(out, &messagemodel.Receipt{ - Message: msgs[i].Cid.String(), - StateRoot: pm.stateroot.String(), - Idx: i, - ExitCode: int64(r.ExitCode), - GasUsed: r.GasUsed, - Return: r.Return, - }) - } - } - return out, nil -} diff --git a/services/processor/tasks/miner/miner.go b/services/processor/tasks/miner/miner.go deleted file mode 100644 index ccced713d..000000000 --- a/services/processor/tasks/miner/miner.go +++ /dev/null @@ -1,204 +0,0 @@ -package miner - -import ( - "context" - "fmt" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - miner "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - "github.com/filecoin-project/lotus/chain/types" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - minermodel "github.com/filecoin-project/sentinel-visor/model/actors/miner" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessMinerTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessMinerTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("minertask") - return next() - }) - // log all task - pool.Middleware((*ProcessMinerTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessMinerTask).Task) - - return pool, queue -} - -type ProcessMinerTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - maddr address.Address - head cid.Cid - tsKey types.TipSetKey - ptsKey types.TipSetKey - stateroot cid.Cid -} - -func (p *ProcessMinerTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("Starting Miner Task", "name", job.Name, "Args", job.Args) - return next() -} - -func (p *ProcessMinerTask) ParseArgs(job *work.Job) error { - addrStr := job.ArgString("address") - if err := job.ArgError(); err != nil { - return err - } - - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - ptsStr := job.ArgString("pts") - if err := job.ArgError(); err != nil { - return err - } - - maddr, err := address.NewFromString(addrStr) - if err != nil { - return err - } - - mhead, err := cid.Decode(headStr) - if err != nil { - return err - } - - mstateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - var ptsKey types.TipSetKey - if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { - return err - } - - p.maddr = maddr - p.head = mhead - p.tsKey = tsKey - p.ptsKey = ptsKey - p.stateroot = mstateroot - return nil -} - -func (p *ProcessMinerTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - // TODO: - // - all processing below can and probably should be done in parallel. - // - processing is incomplete, see below TODO about sector inspection. - // - need caching infront of the lotus api to avoid refetching power for same tipset. - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessMinerTask.Task") - defer span.End() - - curActor, err := p.node.StateGetActor(ctx, p.maddr, p.tsKey) - if err != nil { - return xerrors.Errorf("loading current miner actor: %w", err) - } - - curState, err := miner.Load(p.node.Store(), curActor) - if err != nil { - return xerrors.Errorf("loading current miner state: %w", err) - } - - minfo, err := curState.Info() - if err != nil { - return xerrors.Errorf("loading miner info: %w", err) - } - - // miner raw and qual power - // TODO this needs caching so we don't re-fetch the power actors claim table (that holds this info) for every tipset. - minerPower, err := p.node.StateMinerPower(ctx, p.maddr, p.tsKey) - if err != nil { - return xerrors.Errorf("loading miner power: %w", err) - } - - // needed for diffing. - prevActor, err := p.node.StateGetActor(ctx, p.maddr, p.ptsKey) - if err != nil { - return xerrors.Errorf("loading previous miner actor: %w", err) - } - - prevState, err := miner.Load(p.node.Store(), prevActor) - if err != nil { - return xerrors.Errorf("loading previous miner actor state: %w", err) - } - - preCommitChanges, err := miner.DiffPreCommits(prevState, curState) - if err != nil { - return xerrors.Errorf("diffing miner precommits: %w", err) - } - - sectorChanges, err := miner.DiffSectors(prevState, curState) - if err != nil { - return xerrors.Errorf("diffing miner sectors: %w", err) - } - - // miner partition changes - partitionsDiff, err := p.minerPartitionsDiff(ctx, prevState, curState) - if err != nil { - return fmt.Errorf("diffing miner partitions: %v", err) - } - - // TODO we still need to do a little bit more processing here around sectors to get all the info we need, but this is okay for spike. - - p.pubCh <- &minermodel.MinerTaskResult{ - Ts: p.tsKey, - Pts: p.ptsKey, - Addr: p.maddr, - StateRoot: p.stateroot, - Actor: curActor, - State: curState, - Info: minfo, - Power: minerPower, - PreCommitChanges: preCommitChanges, - SectorChanges: sectorChanges, - PartitionDiff: partitionsDiff, - } - return nil -} - -func (p *ProcessMinerTask) minerPartitionsDiff(ctx context.Context, prevState, curState miner.State) (map[uint64]*minermodel.PartitionStatus, error) { - panic("NYI") -} diff --git a/services/processor/tasks/power/power.go b/services/processor/tasks/power/power.go deleted file mode 100644 index bf6c65636..000000000 --- a/services/processor/tasks/power/power.go +++ /dev/null @@ -1,175 +0,0 @@ -package power - -import ( - "context" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/actors/builtin/power" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - powermodel "github.com/filecoin-project/sentinel-visor/model/actors/power" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessPowerTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessPowerTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("minertask") - return next() - }) - // log all task - pool.Middleware((*ProcessPowerTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessPowerTask).Task) - - return pool, queue -} - -type ProcessPowerTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - maddr address.Address - head cid.Cid - tsKey types.TipSetKey - ptsKey types.TipSetKey - stateroot cid.Cid -} - -func (p *ProcessPowerTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("Starting job", "name", job.Name, "args", job.Args) - return next() -} - -func (p *ProcessPowerTask) ParseArgs(job *work.Job) error { - addrStr := job.ArgString("address") - if err := job.ArgError(); err != nil { - return err - } - - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - ptsStr := job.ArgString("pts") - if err := job.ArgError(); err != nil { - return err - } - - maddr, err := address.NewFromString(addrStr) - if err != nil { - return err - } - - mhead, err := cid.Decode(headStr) - if err != nil { - return err - } - - mstateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - var ptsKey types.TipSetKey - if err := ptsKey.UnmarshalJSON([]byte(ptsStr)); err != nil { - return err - } - - p.maddr = maddr - p.head = mhead - p.tsKey = tsKey - p.ptsKey = ptsKey - p.stateroot = mstateroot - return nil -} - -func (p *ProcessPowerTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessPowerTask.Task") - defer span.End() - - powerActor, err := p.node.StateGetActor(ctx, builtin.StoragePowerActorAddr, p.tsKey) - if err != nil { - return xerrors.Errorf("loading power actor: %w", err) - } - - pstate, err := power.Load(p.node.Store(), powerActor) - if err != nil { - return xerrors.Errorf("loading power actor state: %w", err) - } - - locked, err := pstate.TotalLocked() - if err != nil { - return err - } - pow, err := pstate.TotalPower() - if err != nil { - return err - } - commit, err := pstate.TotalCommitted() - if err != nil { - return err - } - smoothed, err := pstate.TotalPowerSmoothed() - if err != nil { - return err - } - participating, total, err := pstate.MinerCounts() - if err != nil { - return err - } - - p.pubCh <- &powermodel.ChainPower{ - StateRoot: p.stateroot.String(), - TotalRawBytesPower: pow.RawBytePower.String(), - TotalQABytesPower: pow.QualityAdjPower.String(), - TotalRawBytesCommitted: commit.RawBytePower.String(), - TotalQABytesCommitted: commit.QualityAdjPower.String(), - TotalPledgeCollateral: locked.String(), - QASmoothedPositionEstimate: smoothed.PositionEstimate.String(), - QASmoothedVelocityEstimate: smoothed.VelocityEstimate.String(), - MinerCount: total, - ParticipatingMinerCount: participating, - } - return nil -} diff --git a/services/processor/tasks/reward/reward.go b/services/processor/tasks/reward/reward.go deleted file mode 100644 index e4bf248db..000000000 --- a/services/processor/tasks/reward/reward.go +++ /dev/null @@ -1,130 +0,0 @@ -package reward - -import ( - "bytes" - "context" - - "github.com/gocraft/work" - "github.com/gomodule/redigo/redis" - "github.com/ipfs/go-cid" - logging "github.com/ipfs/go-log/v2" - "go.opentelemetry.io/otel/api/global" - - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/specs-actors/actors/builtin/reward" - - "github.com/filecoin-project/sentinel-visor/lens" - "github.com/filecoin-project/sentinel-visor/model" - rewardmodel "github.com/filecoin-project/sentinel-visor/model/actors/reward" -) - -func Setup(concurrency uint, taskName, poolName string, redisPool *redis.Pool, node lens.API, pubCh chan<- model.Persistable) (*work.WorkerPool, *work.Enqueuer) { - pool := work.NewWorkerPool(ProcessRewardTask{}, concurrency, poolName, redisPool) - queue := work.NewEnqueuer(poolName, redisPool) - - // https://github.com/gocraft/work/issues/10#issuecomment-237580604 - // adding fields via a closure gives the workers access to the lotus api, a global could also be used here - pool.Middleware(func(mt *ProcessRewardTask, job *work.Job, next work.NextMiddlewareFunc) error { - mt.node = node - mt.pubCh = pubCh - mt.log = logging.Logger("rewardtask") - return next() - }) - // log all task - pool.Middleware((*ProcessRewardTask).Log) - - // register task method and don't allow retying - pool.JobWithOptions(taskName, work.JobOptions{ - MaxFails: 1, - }, (*ProcessRewardTask).Task) - - return pool, queue -} - -type ProcessRewardTask struct { - node lens.API - log *logging.ZapEventLogger - - pubCh chan<- model.Persistable - - ts types.TipSetKey - head cid.Cid - stateroot cid.Cid -} - -func (p *ProcessRewardTask) Log(job *work.Job, next work.NextMiddlewareFunc) error { - p.log.Infow("starting process reward task", "job", job.ID, "args", job.Args) - return next() -} - -func (p *ProcessRewardTask) ParseArgs(job *work.Job) error { - tsStr := job.ArgString("ts") - if err := job.ArgError(); err != nil { - return err - } - - headStr := job.ArgString("head") - if err := job.ArgError(); err != nil { - return err - } - - srStr := job.ArgString("stateroot") - if err := job.ArgError(); err != nil { - return err - } - - stateroot, err := cid.Decode(srStr) - if err != nil { - return err - } - - head, err := cid.Decode(headStr) - if err != nil { - return err - } - - var tsKey types.TipSetKey - if err := tsKey.UnmarshalJSON([]byte(tsStr)); err != nil { - return err - } - - p.ts = tsKey - p.head = head - p.stateroot = stateroot - return nil -} - -func (p *ProcessRewardTask) Task(job *work.Job) error { - if err := p.ParseArgs(job); err != nil { - return err - } - - ctx := context.Background() - ctx, span := global.Tracer("").Start(ctx, "ProcessRewardTask.Task") - defer span.End() - - rewardStateRaw, err := p.node.ChainReadObj(ctx, p.head) - if err != nil { - return err - } - - var rwdState reward.State - if err := rwdState.UnmarshalCBOR(bytes.NewReader(rewardStateRaw)); err != nil { - return err - } - - p.pubCh <- &rewardmodel.ChainReward{ - StateRoot: p.stateroot.String(), - CumSumBaseline: rwdState.CumsumBaseline.String(), - CumSumRealized: rwdState.CumsumRealized.String(), - EffectiveBaselinePower: rwdState.EffectiveBaselinePower.String(), - NewBaselinePower: rwdState.ThisEpochBaselinePower.String(), - NewRewardSmoothedPositionEstimate: rwdState.ThisEpochRewardSmoothed.PositionEstimate.String(), - NewRewardSmoothedVelocityEstimate: rwdState.ThisEpochRewardSmoothed.VelocityEstimate.String(), - TotalMinedReward: rwdState.TotalMined.String(), - NewReward: rwdState.ThisEpochReward.String(), - EffectiveNetworkTime: int64(rwdState.EffectiveNetworkTime), - } - - return nil -} diff --git a/storage/sql.go b/storage/sql.go index a214e3bc0..1bda52f4d 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -146,19 +146,6 @@ func (d *Database) Close(ctx context.Context) error { return err } -func (d *Database) UnprocessedIndexedBlocks(ctx context.Context, maxHeight, limit int) (blocks.BlocksSynced, error) { - var blkSynced blocks.BlocksSynced - if err := d.DB.ModelContext(ctx, &blkSynced). - Where("height <= ?", maxHeight). - Where("processed_at is null"). - Order("height desc"). - Limit(limit). - Select(); err != nil { - return nil, err - } - return blkSynced, nil -} - func (d *Database) UnprocessedIndexedTipSets(ctx context.Context, maxHeight, limit int) (visor.ProcessingStateChangeList, error) { var blkSynced visor.ProcessingStateChangeList if err := d.DB.ModelContext(ctx, &blkSynced). @@ -172,17 +159,6 @@ func (d *Database) UnprocessedIndexedTipSets(ctx context.Context, maxHeight, lim return blkSynced, nil } -func (d *Database) MostRecentSyncedBlock(ctx context.Context) (*blocks.BlockSynced, error) { - var blkSynced *blocks.BlockSynced - if err := d.DB.ModelContext(ctx, blkSynced). - Order("height desc"). - Limit(1). - Select(); err != nil { - return nil, err - } - return blkSynced, nil -} - func (d *Database) MostRecentAddedTipSet(ctx context.Context) (*visor.ProcessingStateChange, error) { blkSynced := &visor.ProcessingStateChange{} if err := d.DB.ModelContext(ctx, blkSynced). @@ -194,54 +170,6 @@ func (d *Database) MostRecentAddedTipSet(ctx context.Context) (*visor.Processing return blkSynced, nil } -func (d *Database) CollectAndMarkBlocksAsProcessing(ctx context.Context, batch int) (blocks.BlocksSynced, error) { - var blks blocks.BlocksSynced - processedAt := timeNow() - if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { - if _, err := tx.QueryContext(ctx, &blks, - `with toProcess as ( - select cid, height, rank() over (order by height) as rnk - from blocks_synced - where completed_at is null and - processed_at is null and - height > 0 - ) - select cid - from toProcess - where rnk <= ? - for update skip locked`, // ensure that only a single process can select and update blocks as processing. - batch, - ); err != nil { - return err - } - for _, blk := range blks { - if _, err := tx.ModelContext(ctx, blk).Set("processed_at = ?", processedAt). - WherePK(). - Update(); err != nil { - return xerrors.Errorf("marking block as processed: %w", err) - } - } - return nil - }); err != nil { - return nil, err - } - return blks, nil -} - -func (d *Database) MarkBlocksAsProcessed(ctx context.Context, blks visor.ProcessingStateChangeList) error { - return d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { - completedAt := timeNow() - for _, blk := range blks { - if _, err := tx.ModelContext(ctx, &blk).Set("completed_at = ?", completedAt). - WherePK(). - Update(); err != nil { - return err - } - } - return nil - }) -} - // VerifyCurrentSchema compares the schema present in the database with the models used by visor // and returns an error if they are incompatible func (d *Database) VerifyCurrentSchema(ctx context.Context) error { diff --git a/tasks/indexer/tipset.go b/tasks/indexer/tipset.go index 7e18ad29c..d35a9874a 100644 --- a/tasks/indexer/tipset.go +++ b/tasks/indexer/tipset.go @@ -2,14 +2,11 @@ package indexer import ( "errors" - "fmt" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lotus/chain/types" ) -var _ = fmt.Printf - var ErrCacheEmpty = errors.New("cache empty") var ErrAddOutOfOrder = errors.New("added tipset height lower than current head") var ErrRevertOutOfOrder = errors.New("reverted tipset does not match current head")