Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: instrument with tracing #15

Merged
merged 3 commits into from
Sep 21, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
PG_IMAGE?=postgres:10
REDIS_IMAGE?=redis:6

.PHONY: deps
deps:
git submodule update --init --recursive

# test starts dependencies and runs all tests
.PHONY: test
test: pgstart testfull pgstop
test: dockerup testfull dockerdown

# pgstart starts postgres in docker
.PHONY: pgstart
pgstart:
docker run -d --name pg -p 5432:5432 -e POSTGRES_HOST_AUTH_METHOD=trust $(PG_IMAGE)
sleep 10
.PHONY: dockerup
dockerup:
docker-compose up -d

# pgstop stops postgres in docker
.PHONY: pgstop
pgstop:
docker rm -fv pg || true
.PHONY: dockerdown
dockerdown:
docker-compose down

# testfull runs all tests
.PHONY: testfull
Expand Down
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,33 @@ A component of [**Sentinel**](https://github.com/filecoin-project/sentinel), a c
A **Visor** process collects _permanent_ Filecoin chain meterics from a [**Lotus**](https://github.com/filecoin-project/lotus/) daemon, and writes them to a [**TimescaleDB**](https://github.com/timescale/timescaledb) time-series and relational datastore.


## Getting Started
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ 🙏


### Usage

```
sentinel-visor [<flags>] <command>

Use 'sentinel-visor help <command>' to learn more about each command.
```

### Configuring Tracing

The global flag `--tracing=<bool>` turns tracing on or off. It is on by default.

Tracing expects a Jaeger server to be available. Configure the Jaeger settings using [environment variables](https://github.com/jaegertracing/jaeger-client-go#environment-variables). The most important are:

* `JAEGER_SERVICE_NAME` - name of the service (defaults to `sentinel-visor`).
* `JAEGER_AGENT_HOST` - hostname for communicating with Jaeger agent via UDP (defaults to `localhost`).
* `JAEGER_AGENT_PORT` - port for communicating with Jaeger agent via UDP (defaults to `6831`).

By default visor queries the Jaeger agent to determine what level of sampling is required (known as the `remote` sampling type). During testing it can be easier to override to remove sampling by setting
the following environment variables:

```
JAEGER_SAMPLER_TYPE=const JAEGER_SAMPLER_PARAM=1
```

## Code of Conduct

Sentinel Visor follows the [Filecoin Project Code of Conduct](https://github.com/filecoin-project/community/blob/master/CODE_OF_CONDUCT.md). Before contributing, please acquaint yourself with our social courtesies and expectations.
Expand Down
38 changes: 36 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ version: '3.5'
services:
redis:
container_name: redis
image: redis
image: redis:6
ports:
- "6379:6379"
environment:
- ALLOW_EMPTY_PASSWORD=yes
volumes:
- redis-data:/var/lib/redis

redis-commander:
container_name: redis-commander
Expand All @@ -14,7 +18,9 @@ services:
- REDIS_HOSTS=local:redis:6379
ports:
- "8081:8081"

depends_on:
- redis

workerui:
restart: always
environment:
Expand All @@ -24,3 +30,31 @@ services:
dockerfile: Dockerfile.worker
ports:
- "8181:8181"

timescaledb:
container_name: timescaledb
image: timescale/timescaledb:latest-pg10
ports:
- "5432:5432"
environment:
- POSTGRES_PASSWORD=password
volumes:
- timescaledb:/var/lib/postgresql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s
timeout: 5s
retries: 5

jaeger:
container_name: jaeger
image: jaegertracing/all-in-one:1.19
ports:
- "6831:6831/udp"
- "5778:5778"
- "16686:16686"

volumes:
timescaledb:
grafana-data:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend to include the grafana volume here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope. Fixed

redis-data:
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@ require (
github.com/ipfs/go-log/v2 v2.1.2-0.20200626104915-0016c0b4b3e4
github.com/lib/pq v1.8.0
github.com/multiformats/go-multiaddr v0.3.1
github.com/opentracing/opentracing-go v1.2.0
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/testify v1.6.1
github.com/uber/jaeger-client-go v2.23.1+incompatible
github.com/urfave/cli/v2 v2.2.0
github.com/whyrusleeping/cbor-gen v0.0.0-20200826160007-0b9f6c5fb163
go.opentelemetry.io/otel v0.11.0
go.uber.org/zap v1.15.0
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
)
Expand Down
91 changes: 91 additions & 0 deletions lens/lotus/api.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
package lotus

import (
"context"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/sentinel-visor/lens"
"github.com/filecoin-project/specs-actors/actors/util/adt"
cid "github.com/ipfs/go-cid"
"github.com/opentracing/opentracing-go"
)

func NewAPIWrapper(node api.FullNode, store adt.Store) *APIWrapper {
Expand All @@ -23,3 +30,87 @@ type APIWrapper struct {
func (aw *APIWrapper) Store() adt.Store {
return aw.store
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple thoughts on the change: this is definitely an improvement, but I have some concerns --- some of which you already mentioned:

  • Are we going to grow this as the api consumed by Visor grows? Or is there some alternative?
  • Since we are wrapping the API would it be better to have it as a field instead of embedding?
  • Would it be more appropriate to add API tracing to lotus instead? (as a follow on later)
  • Should we include the arguments to these calls as tags on the Span?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll need to grow this if we want complete tracing coverage (we may not) or we should attempt to get it into lotus instead. I'm not convinced we want every method covered, especially since we are concerned about performance.

Wrapping the api, making it unexported would help enforce some type safety.

We could include the arguments as tags. I don't know useful it would be in general so I avoided paying the marshalling cost of things like cids in this pass.

func (aw *APIWrapper) ChainGetBlock(ctx context.Context, msg cid.Cid) (*types.BlockHeader, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainGetBlock")
defer span.Finish()
return aw.FullNode.ChainGetBlock(ctx, msg)
}

func (aw *APIWrapper) ChainGetBlockMessages(ctx context.Context, msg cid.Cid) (*api.BlockMessages, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainGetBlockMessages")
defer span.Finish()
return aw.FullNode.ChainGetBlockMessages(ctx, msg)
}

func (aw *APIWrapper) ChainGetGenesis(ctx context.Context) (*types.TipSet, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainNotify")
defer span.Finish()
return aw.FullNode.ChainGetGenesis(ctx)
}

func (aw *APIWrapper) ChainGetParentMessages(ctx context.Context, bcid cid.Cid) ([]api.Message, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainGetParentMessages")
defer span.Finish()
return aw.FullNode.ChainGetParentMessages(ctx, bcid)
}

func (aw *APIWrapper) ChainGetParentReceipts(ctx context.Context, bcid cid.Cid) ([]*types.MessageReceipt, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainGetParentReceipts")
defer span.Finish()
return aw.FullNode.ChainGetParentReceipts(ctx, bcid)
}

func (aw *APIWrapper) ChainGetTipSet(ctx context.Context, tsk types.TipSetKey) (*types.TipSet, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainGetTipSet")
defer span.Finish()
return aw.FullNode.ChainGetTipSet(ctx, tsk)
}

func (aw *APIWrapper) ChainNotify(ctx context.Context) (<-chan []*api.HeadChange, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainNotify")
defer span.Finish()
return aw.FullNode.ChainNotify(ctx)
}

func (aw *APIWrapper) ChainReadObj(ctx context.Context, obj cid.Cid) ([]byte, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.ChainReadObj")
defer span.Finish()
return aw.FullNode.ChainReadObj(ctx, obj)
}

func (aw *APIWrapper) StateChangedActors(ctx context.Context, old cid.Cid, new cid.Cid) (map[string]types.Actor, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateChangedActors")
defer span.Finish()
return aw.FullNode.StateChangedActors(ctx, old, new)
}

func (aw *APIWrapper) StateGetActor(ctx context.Context, actor address.Address, tsk types.TipSetKey) (*types.Actor, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateGetActor")
defer span.Finish()
return aw.FullNode.StateGetActor(ctx, actor, tsk)
}

func (aw *APIWrapper) StateListActors(ctx context.Context, tsk types.TipSetKey) ([]address.Address, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateListActors")
defer span.Finish()
return aw.FullNode.StateListActors(ctx, tsk)
}

func (aw *APIWrapper) StateMarketDeals(ctx context.Context, tsk types.TipSetKey) (map[string]api.MarketDeal, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateMarketDeals")
defer span.Finish()
return aw.FullNode.StateMarketDeals(ctx, tsk)
}

func (aw *APIWrapper) StateMinerPower(ctx context.Context, addr address.Address, tsk types.TipSetKey) (*api.MinerPower, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateMinerPower")
defer span.Finish()
return aw.FullNode.StateMinerPower(ctx, addr, tsk)
}

func (aw *APIWrapper) StateMinerSectors(ctx context.Context, addr address.Address, filter *bitfield.BitField, filterOut bool, tsk types.TipSetKey) ([]*api.ChainSectorInfo, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "Lotus.StateMinerSectors")
defer span.Finish()
return aw.FullNode.StateMinerSectors(ctx, addr, filter, filterOut, tsk)
}
6 changes: 5 additions & 1 deletion lens/lotus/cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"context"
"fmt"

"github.com/filecoin-project/lotus/api"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
"github.com/opentracing/opentracing-go"
cbg "github.com/whyrusleeping/cbor-gen"
)

Expand Down Expand Up @@ -34,9 +36,11 @@ func (cs *CacheCtxStore) Context() context.Context {
}

func (cs *CacheCtxStore) Get(ctx context.Context, c cid.Cid, out interface{}) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "CacheCtxStore.Get")
defer span.Finish()
cu, ok := out.(cbg.CBORUnmarshaler)
if !ok {
return fmt.Errorf("out paramater does not implement CBORUnmarshaler")
return fmt.Errorf("out parameter does not implement CBORUnmarshaler")
}

// hit :)
Expand Down
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ func main() {
EnvVars: []string{"GOLOG_LOG_LEVEL"},
Value: "debug",
},
&cli.BoolFlag{
Name: "tracing",
EnvVars: []string{"VISOR_TRACING"},
Value: true,
},
},
Commands: []*cli.Command{
runCmd,
Expand Down
4 changes: 4 additions & 0 deletions model/actors/market/dealproposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package market
import (
"context"
"fmt"

"github.com/go-pg/pg/v10"
"github.com/opentracing/opentracing-go"
)

type MarketDealProposal struct {
Expand Down Expand Up @@ -39,6 +41,8 @@ func (dp *MarketDealProposal) PersistWithTx(ctx context.Context, tx *pg.Tx) erro
type MarketDealProposals []*MarketDealProposal

func (dps MarketDealProposals) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MarketDealProposals.PersistWithTx", opentracing.Tags{"count": len(dps)})
defer span.Finish()
for _, dp := range dps {
if err := dp.PersistWithTx(ctx, tx); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions model/actors/market/dealstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package market
import (
"context"
"fmt"

"github.com/go-pg/pg/v10"
"github.com/opentracing/opentracing-go"
)

type MarketDealState struct {
Expand All @@ -27,6 +29,8 @@ func (ds *MarketDealState) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
type MarketDealStates []*MarketDealState

func (dss MarketDealStates) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MarketDealStates.PersistWithTx", opentracing.Tags{"count": len(dss)})
defer span.Finish()
for _, ds := range dss {
if err := ds.PersistWithTx(ctx, tx); err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions model/actors/market/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package market

import (
"context"

"github.com/go-pg/pg/v10"
"github.com/opentracing/opentracing-go"
)

type MarketTaskResult struct {
Expand All @@ -11,6 +13,8 @@ type MarketTaskResult struct {
}

func (mtr *MarketTaskResult) Persist(ctx context.Context, db *pg.DB) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MarketTaskResult.Persist")
defer span.Finish()
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
if err := mtr.Proposals.PersistWithTx(ctx, tx); err != nil {
return err
Expand Down
10 changes: 4 additions & 6 deletions model/actors/miner/power.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/go-pg/pg/v10"
"github.com/opentracing/opentracing-go"
"golang.org/x/xerrors"
)

Expand All @@ -25,16 +26,13 @@ type MinerPower struct {

func (mp *MinerPower) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
if _, err := tx.ModelContext(ctx, mp).
OnConflict("do nothing").
Insert(); err != nil {
return xerrors.Errorf("persisting miner power: %w", err)
}
return nil
return mp.PersistWithTx(ctx, tx)
})
}

func (mp *MinerPower) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MinerPower.PersistWithTx")
defer span.Finish()
if _, err := tx.ModelContext(ctx, mp).
OnConflict("do nothing").
Insert(); err != nil {
Expand Down
11 changes: 5 additions & 6 deletions model/actors/miner/precommit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package miner

import (
"context"

"github.com/go-pg/pg/v10"
"github.com/opentracing/opentracing-go"
"golang.org/x/xerrors"
)

Expand Down Expand Up @@ -66,16 +68,13 @@ type MinerPreCommitInfos []*MinerPreCommitInfo

func (mpis MinerPreCommitInfos) Persist(ctx context.Context, db *pg.DB) error {
return db.RunInTransaction(ctx, func(tx *pg.Tx) error {
for _, mpi := range mpis {
if err := mpi.PersistWithTx(ctx, tx); err != nil {
return err
}
}
return nil
return mpis.PersistWithTx(ctx, tx)
})
}

func (mpis MinerPreCommitInfos) PersistWithTx(ctx context.Context, tx *pg.Tx) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "MinerPreCommitInfos.PersistWithTx", opentracing.Tags{"count": len(mpis)})
defer span.Finish()
for _, mpi := range mpis {
if err := mpi.PersistWithTx(ctx, tx); err != nil {
return err
Expand Down
Loading