Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: improve distributed indexing metrics and logging #981

Merged
merged 1 commit into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions chain/indexer/distributed/queue/tasks/gapfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing"
Expand All @@ -21,14 +24,32 @@ const (
)

type GapFillTipSetPayload struct {
TipSet *types.TipSet
// TODO include the height of the tipset here to correctly mark the epoch as gap filled cuz Null rounds
TipSet *types.TipSet
Tasks []string
TraceCarrier *tracing.TraceCarrier `json:",omitempty"`
}

// Attributes returns a slice of attributes for populating tracing span attributes.
func (g GapFillTipSetPayload) Attributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.Int64("height", int64(g.TipSet.Height())),
attribute.String("tipset", g.TipSet.Key().String()),
attribute.StringSlice("tasks", g.Tasks),
}
}

// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the
// logging context, and to selectively omit information which shouldn't be
// included in logs (e.g., passwords).
func (g GapFillTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("tipset", g.TipSet.Key().String())
enc.AddInt64("height", int64(g.TipSet.Height()))
enc.AddString("tasks", fmt.Sprint(g.Tasks))
return nil
}

// HasTraceCarrier returns true iff payload contains a trace.
func (g *GapFillTipSetPayload) HasTraceCarrier() bool {
func (g GapFillTipSetPayload) HasTraceCarrier() bool {
return !(g.TraceCarrier == nil)
}

Expand All @@ -54,32 +75,40 @@ func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Con
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Infow("gap fill tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks)

taskID := t.ResultWriter().TaskID()
log.Infow("gap fill tipset", "taskID", taskID, zap.Inline(p))

if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(p.Attributes()...)
}
}

success, err := gh.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset for gap fill", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
// TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data.
return fmt.Errorf("indexing tipset for gap fill tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry)
return fmt.Errorf("indexing tipset for gap fill %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
} else {
return err
}
}
if !success {
log.Errorw("failed to gap fill task successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String())
return fmt.Errorf("gap filling tipset.(height) %s.(%d)", p.TipSet.Key(), p.TipSet.Height())
log.Errorw("failed to gap fill task successfully", "taskID", taskID, zap.Inline(p))
return fmt.Errorf("gap filling tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID)
} else {
if err := gh.db.SetGapsFilled(ctx, int64(p.TipSet.Height()), p.Tasks...); err != nil {
log.Errorw("failed to mark gap as filled", "error", err)
log.Errorw("failed to mark gap as filled", "taskID", taskID, zap.Inline(p), "error", err)
return err
}
}
log.Infow("gap fill tipset success", "taskID", taskID, zap.Inline(p))
return nil
}
43 changes: 36 additions & 7 deletions chain/indexer/distributed/queue/tasks/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import (
"github.com/filecoin-project/lotus/chain/types"
"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing"
Expand All @@ -28,8 +31,27 @@ type IndexTipSetPayload struct {
TraceCarrier *tracing.TraceCarrier `json:",omitempty"`
}

// Attributes returns a slice of attributes for populating tracing span attributes.
func (i IndexTipSetPayload) Attributes() []attribute.KeyValue {
return []attribute.KeyValue{
attribute.Int64("height", int64(i.TipSet.Height())),
attribute.String("tipset", i.TipSet.Key().String()),
attribute.StringSlice("tasks", i.Tasks),
}
}

// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the
// logging context, and to selectively omit information which shouldn't be
// included in logs (e.g., passwords).
func (i IndexTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("tipset", i.TipSet.Key().String())
enc.AddInt64("height", int64(i.TipSet.Height()))
enc.AddString("tasks", fmt.Sprint(i.Tasks))
return nil
}

// HasTraceCarrier returns true iff payload contains a trace.
func (i *IndexTipSetPayload) HasTraceCarrier() bool {
func (i IndexTipSetPayload) HasTraceCarrier() bool {
return !(i.TraceCarrier == nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is access by value to prevent race conditions on i.TraceCarrier state here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because we only ever use it as a value within methods. e.g. its instantiated as a value here: https://github.com/filecoin-project/lily/pull/981/files#diff-583b89249d2ef998d0f072521baf5ae75d506958a317c2ea3e6b6ad74ac7551eR75

}

Expand All @@ -54,27 +76,34 @@ func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t *
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return err
}
log.Infow("indexing tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks)

taskID := t.ResultWriter().TaskID()
log.Infow("indexing tipset", "taskID", taskID, zap.Inline(p))

if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}
span := trace.SpanFromContext(ctx)
if span.IsRecording() {
span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID()))
span.SetAttributes(p.Attributes()...)
}
}

success, err := ih.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks))
if err != nil {
if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) {
log.Errorw("failed to index tipset", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String(), "error", err)
log.Errorw("failed to index tipset", zap.Inline(p), "error", err)
// return SkipRetry to prevent the task from being retried since nodes do not contain the block
// TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data.
return fmt.Errorf("indexing tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry)
return fmt.Errorf("indexing tipset %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry)
}
return err
}
if !success {
log.Errorw("failed to index tipset successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String())
return fmt.Errorf("indexing tipset.(height) %s.(%d)", p.TipSet.Key().String(), p.TipSet.Height())
log.Errorw("failed to index task successfully", "taskID", taskID, zap.Inline(p))
return fmt.Errorf("indexing tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID)
}
log.Infow("index tipset success", "taskID", taskID, zap.Inline(p))
return nil
}
19 changes: 17 additions & 2 deletions chain/indexer/distributed/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@ import (

"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/storage"
)

Expand Down Expand Up @@ -45,6 +49,15 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
t.server.ServerConfig.Logger = log.With("name", t.name)
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}

stats.Record(ctx, metrics.TipSetWorkerConcurrency.M(int64(t.server.ServerConfig.Concurrency)))
for queueName, priority := range t.server.ServerConfig.Queues {
if err := stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.QueueName, queueName)},
metrics.TipSetWorkerQueuePriority.M(int64(priority))); err != nil {
return err
}
}

server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig)
if err := server.Start(mux); err != nil {
return err
Expand All @@ -66,25 +79,27 @@ func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task,
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
return
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
return
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err)
}
}
9 changes: 9 additions & 0 deletions commands/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"github.com/mitchellh/go-homedir"
"github.com/multiformats/go-multiaddr"
"github.com/urfave/cli/v2"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/commands/util"
"github.com/filecoin-project/lily/config"
"github.com/filecoin-project/lily/lens/lily"
"github.com/filecoin-project/lily/lens/lily/modules"
lutil "github.com/filecoin-project/lily/lens/util"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
"github.com/filecoin-project/lily/version"
)

var ClientAPIFlags struct {
Expand Down Expand Up @@ -205,6 +209,11 @@ Note that jobs are not persisted between restarts of the daemon. See
}
}

ctx, _ = tag.New(ctx,
tag.Insert(metrics.Version, version.String()),
)
stats.Record(ctx, metrics.LilyInfo.M(1))

if err := config.EnsureExists(daemonFlags.config); err != nil {
return fmt.Errorf("ensuring config is present at %q: %w", daemonFlags.config, err)
}
Expand Down
24 changes: 13 additions & 11 deletions commands/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,18 @@ func setupMetrics(flags LilyMetricOpts) error {
return err
}

inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: flags.RedisAddr,
DB: flags.RedisDB,
Password: flags.RedisPassword,
Username: flags.RedisUsername,
})
registry.MustRegister(
goCollector,
procCollector,
asynqmetrics.NewQueueMetricsCollector(inspector),
)
metricCollectors := []prom.Collector{goCollector, procCollector}
if flags.RedisNetwork != "" {
inspector := asynq.NewInspector(asynq.RedisClientOpt{
Addr: flags.RedisAddr,
DB: flags.RedisDB,
Password: flags.RedisPassword,
Username: flags.RedisUsername,
})
metricCollectors = append(metricCollectors, asynqmetrics.NewQueueMetricsCollector(inspector))
}
registry.MustRegister(metricCollectors...)

// register prometheus with opencensus
view.RegisterExporter(pe)
view.SetReportingPeriod(2 * time.Second)
Expand Down Expand Up @@ -139,6 +140,7 @@ func setupMetrics(flags LilyMetricOpts) error {
mux.Handle("/debug/pprof/heap", pprof.Handler("heap"))
mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex"))
mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate"))
log.Infof("serving metrics on %s", flags.PrometheusPort)
if err := http.ListenAndServe(flags.PrometheusPort, mux); err != nil {
log.Fatalf("Failed to run Prometheus /metrics endpoint: %v", err)
}
Expand Down
Loading