Skip to content

Commit

Permalink
improve tracing data and include lotus tracing (#794)
Browse files Browse the repository at this point in the history
* chore: update opentelemetry and support traces from lotus

* chore: clean up tracing flags and setup code

* chore: remove redundant trace from actorstate task

* polish: instrument TipSetIndexer spans

* polish: instrument walker spans

* polish: instrument watcher spans

* polish: instrument GetExecutedAndBlockMessagesForTipset spans

* polish: allow docker-compose jaeger to work by default
  • Loading branch information
frrist authored Jan 17, 2022
1 parent f0e938a commit fe9a838
Show file tree
Hide file tree
Showing 37 changed files with 331 additions and 206 deletions.
58 changes: 50 additions & 8 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -158,9 +159,15 @@ func NewTipSetIndexer(node lens.API, d model.Storage, window time.Duration, name

// TipSet is called when a new tipset has been discovered
func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Indexer.TipSet")
ctx, span := otel.Tracer("").Start(ctx, "TipSetIndexer.TipSet")
if span.IsRecording() {
span.SetAttributes(attribute.String("tipset", ts.String()), attribute.Int64("height", int64(ts.Height())))
span.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
attribute.String("name", t.name),
attribute.String("window", t.window.String()),
attribute.StringSlice("tasks", t.tasks),
)
}
defer span.End()

Expand Down Expand Up @@ -211,6 +218,15 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
return nil
}

if span.IsRecording() {
span.SetAttributes(
attribute.String("next_tipset", next.String()),
attribute.Int64("next_height", int64(next.Height())),
attribute.String("current_tipset", current.String()),
attribute.Int64("current_height", int64(current.Height())),
)
}

ll := log.With("current", int64(current.Height()), "next", int64(next.Height()))
ll.Debugw("indexing tipset")

Expand Down Expand Up @@ -386,12 +402,14 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
if _, complete := completed[name]; !complete {
taskOutputs[name] = model.PersistableList{t.buildSkippedTipsetReport(ts, name, start, "indexer not ready")}
ll.Infow("task skipped", "task", name, "reason", "indexer not ready")
span.AddEvent(fmt.Sprintf("skipped task: %s", res.Task))
}
}
stats.Record(ctx, metrics.TipSetSkip.M(1))
goto persist
case res = <-results:
}
span.AddEvent(fmt.Sprintf("completed task: %s", res.Task))
inFlight--

llt := ll.With("task", res.Task)
Expand Down Expand Up @@ -450,8 +468,16 @@ persist:

// Persist all results
go func() {
ctx, persistSpan := otel.Tracer("").Start(ctx, "TipSetIndexer.Persist")
if persistSpan.IsRecording() {
persistSpan.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
)
}
// free up the slot when done
defer func() {
persistSpan.End()
<-t.persistSlot
}()

Expand All @@ -477,11 +503,13 @@ persist:
wg.Wait()
ll.Infow("tasks complete", "total_time", time.Since(start))
}()

return nil
}

func (t *TipSetIndexer) runProcessor(ctx context.Context, p TipSetProcessor, name string, ts *types.TipSet, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.Processor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -540,9 +568,12 @@ func (t *TipSetIndexer) getGenesisActors(ctx context.Context) (map[string]lens.A
// and applies it to versions of state tress supporting it. These include Version 2 and 3 of the lotus state tree implementation.
// stateChangedActors will fall back to the lotus API method when the optimized diffing cannot be applied.
func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid) (map[string]lens.ActorStateChange, error) {
ctx, span := otel.Tracer("").Start(ctx, "StateChangedActors")
ctx, span := otel.Tracer("").Start(ctx, "TipSetIndexer.StateChangedActors")
if span.IsRecording() {
span.SetAttributes(attribute.String("old", old.String()), attribute.String("new", new.String()))
span.SetAttributes(
attribute.String("old", old.String()),
attribute.String("new", new.String()),
)
}
defer span.End()

Expand All @@ -561,9 +592,7 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
}

if newVersion == oldVersion && (newVersion != types.StateTreeVersion0 && newVersion != types.StateTreeVersion1) {
if span.IsRecording() {
span.SetAttributes(attribute.String("diff", "fast"))
}
span.SetAttributes(attribute.String("diff", "fast"))
// TODO: replace hamt.UseTreeBitWidth and hamt.UseHashFunction with values based on network version
changes, err := hamt.Diff(ctx, t.node.Store(), t.node.Store(), oldRoot, newRoot, hamt.UseTreeBitWidth(5), hamt.UseHashFunction(func(input []byte) []byte {
res := sha256.Sum256(input)
Expand Down Expand Up @@ -609,6 +638,7 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
return out, nil
}
}
span.SetAttributes(attribute.String("diff", "slow"))
log.Debug("using slow state diff")
actors, err := t.node.StateChangedActors(ctx, old, new)
if err != nil {
Expand All @@ -626,6 +656,9 @@ func (t *TipSetIndexer) stateChangedActors(ctx context.Context, old, new cid.Cid
}

func (t *TipSetIndexer) runMessageProcessor(ctx context.Context, p MessageProcessor, name string, ts, pts *types.TipSet, emsgs []*lens.ExecutedMessage, blkMsgs []*lens.BlockMessages, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.MessageProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -653,6 +686,9 @@ func (t *TipSetIndexer) runMessageProcessor(ctx context.Context, p MessageProces
}

func (t *TipSetIndexer) runConsensusProcessor(ctx context.Context, p TipSetsProcessor, name string, ts, pts *types.TipSet, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.ConsensusProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -680,6 +716,9 @@ func (t *TipSetIndexer) runConsensusProcessor(ctx context.Context, p TipSetsProc
}

func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor, name string, ts, pts *types.TipSet, actors map[string]lens.ActorStateChange, emsgs []*lens.ExecutedMessage, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.ActorProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down Expand Up @@ -707,6 +746,9 @@ func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor,
}

func (t *TipSetIndexer) runMessageExecutionProcessor(ctx context.Context, p MessageExecutionProcessor, name string, ts, pts *types.TipSet, imsgs []*lens.MessageExecution, results chan *TaskResult) {
ctx, span := otel.Tracer("").Start(ctx, fmt.Sprintf("TipSetIndexer.MessageExecutionProcessor.%s", name))
defer span.End()

ctx, _ = tag.New(ctx, tag.Upsert(metrics.TaskType, name))
stats.Record(ctx, metrics.TipsetHeight.M(int64(ts.Height())))
stop := metrics.Timer(ctx, metrics.ProcessingDuration)
Expand Down
15 changes: 12 additions & 3 deletions chain/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package chain

import (
"context"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/lens"
Expand Down Expand Up @@ -72,11 +70,20 @@ func (c *Walker) Done() <-chan struct{} {
}

func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Walker.WalkChain", trace.WithAttributes(attribute.Int64("height", c.maxHeight)))
ctx, span := otel.Tracer("").Start(ctx, "Walker.WalkChain")
if span.IsRecording() {
span.SetAttributes(
attribute.Int64("height", int64(ts.Height())),
attribute.String("tipset", ts.String()),
attribute.Int64("min_height", c.minHeight),
attribute.Int64("max_height", c.maxHeight),
)
}
defer span.End()

log.Debugw("found tipset", "height", ts.Height())
if err := c.obs.TipSet(ctx, ts); err != nil {
span.RecordError(err)
return xerrors.Errorf("notify tipset: %w", err)
}

Expand All @@ -90,6 +97,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)

ts, err = node.ChainGetTipSet(ctx, ts.Parents())
if err != nil {
span.RecordError(err)
return xerrors.Errorf("get tipset: %w", err)
}

Expand All @@ -99,6 +107,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)

log.Debugw("found tipset", "height", ts.Height())
if err := c.obs.TipSet(ctx, ts); err != nil {
span.RecordError(err)
return xerrors.Errorf("notify tipset: %w", err)
}

Expand Down
11 changes: 10 additions & 1 deletion chain/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package chain
import (
"context"
"errors"

"github.com/filecoin-project/lotus/chain/types"
"github.com/gammazero/workerpool"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/metrics"
Expand Down Expand Up @@ -115,6 +116,14 @@ func (c *Watcher) index(ctx context.Context, he *HeadEvent) error {

// maybeIndexTipSet is called when a new tipset has been discovered
func (c *Watcher) maybeIndexTipSet(ctx context.Context, ts *types.TipSet) error {
ctx, span := otel.Tracer("").Start(ctx, "Watcher.maybeIndexTipSet")
if span.IsRecording() {
span.SetAttributes(
attribute.String("tipset", ts.String()),
attribute.Int64("height", int64(ts.Height())),
)
}
defer span.End()
// Process the tipset if we can, otherwise skip it so we don't block if indexing is too slow
select {
case <-ctx.Done():
Expand Down
12 changes: 2 additions & 10 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/commands/util"
Expand Down Expand Up @@ -178,14 +176,8 @@ Note that jobs are not persisted between restarts of the daemon. See
return xerrors.Errorf("setup metrics: %w", err)
}

if VisorTracingFlags.Tracing {
tp, err := NewJaegerTraceProvider(VisorTracingFlags)
if err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}
otel.SetTracerProvider(tp)
} else {
otel.SetTracerProvider(trace.NewNoopTracerProvider())
if err := setupTracing(VisorTracingFlags); err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}

ctx := context.Background()
Expand Down
63 changes: 23 additions & 40 deletions commands/setup.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package commands

import (
"fmt"
octrace "go.opencensus.io/trace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/bridge/opencensus"
"net/http"
"net/http/pprof"
"strings"
Expand All @@ -16,10 +18,6 @@ import (
"github.com/prometheus/client_golang/prometheus/collectors"
"go.opencensus.io/stats/view"
"go.opencensus.io/zpages"
"go.opentelemetry.io/otel/exporters/trace/jaeger"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"golang.org/x/xerrors"

"github.com/filecoin-project/lily/metrics"
Expand All @@ -36,11 +34,9 @@ type VisorLogOpts struct {
var VisorLogFlags VisorLogOpts

type VisorTracingOpts struct {
Tracing bool
JaegerHost string
JaegerPort int
JaegerName string
JaegerSampleType string
Enabled bool
ServiceName string
ProviderURL string
JaegerSamplerParam float64
}

Expand All @@ -52,36 +48,6 @@ type VisorMetricOpts struct {

var VisorMetricFlags VisorMetricOpts

func NewJaegerTraceProvider(flags VisorTracingOpts) (*sdktrace.TracerProvider, error) {
serviceName := flags.JaegerName
sampleRatio := flags.JaegerSamplerParam
agentEndpoint := fmt.Sprintf("http://%s:%d/api/traces", flags.JaegerHost, flags.JaegerPort)

log.Infow("creating jaeger trace provider", "name", serviceName, "ratio", sampleRatio, "endpoint", agentEndpoint)
var sampler sdktrace.Sampler
if sampleRatio < 1 && sampleRatio > 0 {
sampler = sdktrace.ParentBased(sdktrace.TraceIDRatioBased(sampleRatio))
} else if sampleRatio == 1 {
sampler = sdktrace.AlwaysSample()
} else {
sampler = sdktrace.NeverSample()
}

exp, err := jaeger.NewRawExporter(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(agentEndpoint)))
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
// Always be sure to batch in production.
sdktrace.WithBatcher(exp),
sdktrace.WithSampler(sampler),
sdktrace.WithResource(resource.NewWithAttributes(
semconv.ServiceNameKey.String(serviceName),
)),
)
return tp, nil
}

func setupLogging(flags VisorLogOpts) error {
ll := flags.LogLevel
if err := logging.SetLogLevel("*", ll); err != nil {
Expand Down Expand Up @@ -161,3 +127,20 @@ func setupMetrics(flags VisorMetricOpts) error {
}()
return nil
}

func setupTracing(flags VisorTracingOpts) error {
if !flags.Enabled {
return nil
}

tp, err := metrics.NewJaegerTraceProvider(VisorTracingFlags.ServiceName, VisorTracingFlags.ProviderURL, VisorTracingFlags.JaegerSamplerParam)
if err != nil {
return xerrors.Errorf("setup tracing: %w", err)
}
otel.SetTracerProvider(tp)
// upgrades libraries (lotus) that use OpenCensus to OpenTelemetry to facilitate a migration.
tracer := tp.Tracer(VisorTracingFlags.ServiceName)
octrace.DefaultTracer = opencensus.NewTracer(tracer)

return nil
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ services:
- "6831:6831/udp"
- "5778:5778"
- "16686:16686"
- "14268:14268"

prometheus:
image: prom/prometheus:v2.1.0
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ require (
github.com/urfave/cli/v2 v2.3.0
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.opentelemetry.io/otel v1.2.0
go.opentelemetry.io/otel/bridge/opencensus v0.25.0
go.opentelemetry.io/otel/exporters/jaeger v1.2.0
go.opentelemetry.io/otel/sdk v1.2.0
go.uber.org/fx v1.15.0
go.uber.org/zap v1.19.0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
Expand Down
Loading

0 comments on commit fe9a838

Please sign in to comment.