Skip to content

Commit

Permalink
chore: improve distributed indexing metrics and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Jun 9, 2022
1 parent 0464ef6 commit d0712f5
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 50 deletions.
47 changes: 38 additions & 9 deletions chain/indexer/distributed/queue/tasks/gapfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing"
Expand All @@ -21,14 +24,32 @@ const (
)

type GapFillTipSetPayload struct {
TipSet *types.TipSet
// TODO include the height of the tipset here to correctly mark the epoch as gap filled cuz Null rounds
TipSet *types.TipSet
Tasks []string
TraceCarrier *tracing.TraceCarrier `json:",omitempty"`
}

// Attributes returns a slice of attributes for populating tracing span attributes.
func (g GapFillTipSetPayload) Attributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.Int64("height", int64(g.TipSet.Height())),
attribute.String("tipset", g.TipSet.Key().String()),
attribute.StringSlice("tasks", g.Tasks),
}
}

// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the
// logging context, and to selectively omit information which shouldn't be
// included in logs (e.g., passwords).
func (g GapFillTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("tipset", g.TipSet.Key().String())
enc.AddInt64("height", int64(g.TipSet.Height()))
enc.AddString("tasks", fmt.Sprint(g.Tasks))
return nil
}

// HasTraceCarrier returns true iff payload contains a trace.
func (g *GapFillTipSetPayload) HasTraceCarrier() bool {
func (g GapFillTipSetPayload) HasTraceCarrier() bool {
return !(g.TraceCarrier == nil)
}

Expand All @@ -54,32 +75,40 @@ func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Con
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Infow("gap fill tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks)

taskID := t.ResultWriter().TaskID()
log.Infow("gap fill tipset", "taskID", taskID, zap.Inline(p))

if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(p.Attributes()...)
}
}

success, err := gh.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset for gap fill", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
// TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data.
return fmt.Errorf("indexing tipset for gap fill tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry)
return fmt.Errorf("indexing tipset for gap fill %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
} else {
return err
}
}
if !success {
log.Errorw("failed to gap fill task successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String())
return fmt.Errorf("gap filling tipset.(height) %s.(%d)", p.TipSet.Key(), p.TipSet.Height())
log.Errorw("failed to gap fill task successfully", "taskID", taskID, zap.Inline(p))
return fmt.Errorf("gap filling tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID)
} else {
if err := gh.db.SetGapsFilled(ctx, int64(p.TipSet.Height()), p.Tasks...); err != nil {
log.Errorw("failed to mark gap as filled", "error", err)
log.Errorw("failed to mark gap as filled", "taskID", taskID, zap.Inline(p), "error", err)
return err
}
}
log.Infow("gap fill tipset success", "taskID", taskID, zap.Inline(p))
return nil
}
43 changes: 36 additions & 7 deletions chain/indexer/distributed/queue/tasks/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing"
Expand All @@ -28,8 +31,27 @@ type IndexTipSetPayload struct {
TraceCarrier *tracing.TraceCarrier `json:",omitempty"`
}

// Attributes returns a slice of attributes for populating tracing span attributes.
func (i IndexTipSetPayload) Attributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.Int64("height", int64(i.TipSet.Height())),
attribute.String("tipset", i.TipSet.Key().String()),
attribute.StringSlice("tasks", i.Tasks),
}
}

// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the
// logging context, and to selectively omit information which shouldn't be
// included in logs (e.g., passwords).
func (i IndexTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("tipset", i.TipSet.Key().String())
enc.AddInt64("height", int64(i.TipSet.Height()))
enc.AddString("tasks", fmt.Sprint(i.Tasks))
return nil
}

// HasTraceCarrier returns true iff payload contains a trace.
func (i *IndexTipSetPayload) HasTraceCarrier() bool {
func (i IndexTipSetPayload) HasTraceCarrier() bool {
return !(i.TraceCarrier == nil)
}

Expand All @@ -54,27 +76,34 @@ func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t *
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Infow("indexing tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks)

taskID := t.ResultWriter().TaskID()
log.Infow("indexing tipset", "taskID", taskID, zap.Inline(p))

if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(p.Attributes()...)
}
}

success, err := ih.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String(), "error", err)
log.Errorw("failed to index tipset", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
// TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data.
return fmt.Errorf("indexing tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry)
return fmt.Errorf("indexing tipset %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
}
return err
}
if !success {
log.Errorw("failed to index tipset successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String())
return fmt.Errorf("indexing tipset.(height) %s.(%d)", p.TipSet.Key().String(), p.TipSet.Height())
log.Errorw("failed to index task successfully", "taskID", taskID, zap.Inline(p))
return fmt.Errorf("indexing tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID)
}
log.Infow("index tipset success", "taskID", taskID, zap.Inline(p))
return nil
}
19 changes: 17 additions & 2 deletions chain/indexer/distributed/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (

"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/storage"
)

Expand Down Expand Up @@ -45,6 +49,15 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
t.server.ServerConfig.Logger = log.With("name", t.name)
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}

stats.Record(ctx, metrics.TipSetWorkerConcurrency.M(int64(t.server.ServerConfig.Concurrency)))
for queueName, priority := range t.server.ServerConfig.Queues {
if err := stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.QueueName, queueName)},
metrics.TipSetWorkerQueuePriority.M(int64(priority))); err != nil {
return err
}
}

server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig)
if err := server.Start(mux); err != nil {
return err
Expand All @@ -66,25 +79,27 @@ func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task,
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
return
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
return
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err)
}
}
9 changes: 9 additions & 0 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/commands/util"
"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/lens/lily"
"github.com/filecoin-project/lily/lens/lily/modules"
lutil "github.com/filecoin-project/lily/lens/util"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
"github.com/filecoin-project/lily/version"
)

var ClientAPIFlags struct {
Expand Down Expand Up @@ -205,6 +209,11 @@ Note that jobs are not persisted between restarts of the daemon. See
}
}

ctx, _ = tag.New(ctx,
tag.Insert(metrics.Version, version.String()),
)
stats.Record(ctx, metrics.LilyInfo.M(1))

if err := config.EnsureExists(daemonFlags.config); err != nil {
return fmt.Errorf("ensuring config is present at %q: %w", daemonFlags.config, err)
}
Expand Down
24 changes: 13 additions & 11 deletions commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,18 @@ func setupMetrics(flags LilyMetricOpts) error {
return err
}

inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: flags.RedisAddr,
DB: flags.RedisDB,
Password: flags.RedisPassword,
Username: flags.RedisUsername,
})
registry.MustRegister(
goCollector,
procCollector,
asynqmetrics.NewQueueMetricsCollector(inspector),
)
metricCollectors := []prom.Collector{goCollector, procCollector}
if flags.RedisNetwork != "" {
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: flags.RedisAddr,
DB: flags.RedisDB,
Password: flags.RedisPassword,
Username: flags.RedisUsername,
})
metricCollectors = append(metricCollectors, asynqmetrics.NewQueueMetricsCollector(inspector))
}
registry.MustRegister(metricCollectors...)

// register prometheus with opencensus
view.RegisterExporter(pe)
view.SetReportingPeriod(2 * time.Second)
Expand Down Expand Up @@ -139,6 +140,7 @@ func setupMetrics(flags LilyMetricOpts) error {
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
log.Infof("serving metrics on %s", flags.PrometheusPort)
if err := http.ListenAndServe(flags.PrometheusPort, mux); err != nil {
log.Fatalf("Failed to run Prometheus /metrics endpoint: %v", err)
}
Expand Down
Loading

0 comments on commit d0712f5

Please sign in to comment.