diff --git a/chain/indexer/distributed/queue/tasks/gapfill.go b/chain/indexer/distributed/queue/tasks/gapfill.go index 0232ac9bd..2431b3ce0 100644 --- a/chain/indexer/distributed/queue/tasks/gapfill.go +++ b/chain/indexer/distributed/queue/tasks/gapfill.go @@ -9,7 +9,10 @@ import ( "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/hibiken/asynq" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/filecoin-project/lily/chain/indexer" "github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing" @@ -21,14 +24,32 @@ const ( ) type GapFillTipSetPayload struct { - TipSet *types.TipSet - // TODO include the height of the tipset here to correctly mark the epoch as gap filled cuz Null rounds + TipSet *types.TipSet Tasks []string TraceCarrier *tracing.TraceCarrier `json:",omitempty"` } +// Attributes returns a slice of attributes for populating tracing span attributes. +func (g GapFillTipSetPayload) Attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.Int64("height", int64(g.TipSet.Height())), + attribute.String("tipset", g.TipSet.Key().String()), + attribute.StringSlice("tasks", g.Tasks), + } +} + +// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the +// logging context, and to selectively omit information which shouldn't be +// included in logs (e.g., passwords). +func (g GapFillTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("tipset", g.TipSet.Key().String()) + enc.AddInt64("height", int64(g.TipSet.Height())) + enc.AddString("tasks", fmt.Sprint(g.Tasks)) + return nil +} + // HasTraceCarrier returns true iff payload contains a trace. -func (g *GapFillTipSetPayload) HasTraceCarrier() bool { +func (g GapFillTipSetPayload) HasTraceCarrier() bool { return !(g.TraceCarrier == nil) } @@ -54,32 +75,40 @@ func (gh *AsynqGapFillTipSetTaskHandler) HandleGapFillTipSetTask(ctx context.Con if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } - log.Infow("gap fill tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks) + + taskID := t.ResultWriter().TaskID() + log.Infow("gap fill tipset", "taskID", taskID, zap.Inline(p)) if p.HasTraceCarrier() { if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { ctx = trace.ContextWithRemoteSpanContext(ctx, sc) } + span := trace.SpanFromContext(ctx) + if span.IsRecording() { + span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID())) + span.SetAttributes(p.Attributes()...) + } } success, err := gh.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks)) if err != nil { if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) { + log.Errorw("failed to index tipset for gap fill", zap.Inline(p), "error", err) // return SkipRetry to prevent the task from being retried since nodes do not contain the block - // TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data. - return fmt.Errorf("indexing tipset for gap fill tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry) + return fmt.Errorf("indexing tipset for gap fill %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry) } else { return err } } if !success { - log.Errorw("failed to gap fill task successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String()) - return fmt.Errorf("gap filling tipset.(height) %s.(%d)", p.TipSet.Key(), p.TipSet.Height()) + log.Errorw("failed to gap fill task successfully", "taskID", taskID, zap.Inline(p)) + return fmt.Errorf("gap filling tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID) } else { if err := gh.db.SetGapsFilled(ctx, int64(p.TipSet.Height()), p.Tasks...); err != nil { - log.Errorw("failed to mark gap as filled", "error", err) + log.Errorw("failed to mark gap as filled", "taskID", taskID, zap.Inline(p), "error", err) return err } } + log.Infow("gap fill tipset success", "taskID", taskID, zap.Inline(p)) return nil } diff --git a/chain/indexer/distributed/queue/tasks/index.go b/chain/indexer/distributed/queue/tasks/index.go index e0186f389..3608e5be0 100644 --- a/chain/indexer/distributed/queue/tasks/index.go +++ b/chain/indexer/distributed/queue/tasks/index.go @@ -10,7 +10,10 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/hibiken/asynq" logging "github.com/ipfs/go-log/v2" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/filecoin-project/lily/chain/indexer" "github.com/filecoin-project/lily/chain/indexer/distributed/queue/tracing" @@ -28,8 +31,27 @@ type IndexTipSetPayload struct { TraceCarrier *tracing.TraceCarrier `json:",omitempty"` } +// Attributes returns a slice of attributes for populating tracing span attributes. +func (i IndexTipSetPayload) Attributes() []attribute.KeyValue { + return []attribute.KeyValue{ + attribute.Int64("height", int64(i.TipSet.Height())), + attribute.String("tipset", i.TipSet.Key().String()), + attribute.StringSlice("tasks", i.Tasks), + } +} + +// MarshalLogObject implement ObjectMarshaler and allows user-defined types to efficiently add themselves to the +// logging context, and to selectively omit information which shouldn't be +// included in logs (e.g., passwords). +func (i IndexTipSetPayload) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("tipset", i.TipSet.Key().String()) + enc.AddInt64("height", int64(i.TipSet.Height())) + enc.AddString("tasks", fmt.Sprint(i.Tasks)) + return nil +} + // HasTraceCarrier returns true iff payload contains a trace. -func (i *IndexTipSetPayload) HasTraceCarrier() bool { +func (i IndexTipSetPayload) HasTraceCarrier() bool { return !(i.TraceCarrier == nil) } @@ -54,27 +76,34 @@ func (ih *AsynqTipSetTaskHandler) HandleIndexTipSetTask(ctx context.Context, t * if err := json.Unmarshal(t.Payload(), &p); err != nil { return err } - log.Infow("indexing tipset", "tipset", p.TipSet.String(), "height", p.TipSet.Height(), "tasks", p.Tasks) + + taskID := t.ResultWriter().TaskID() + log.Infow("indexing tipset", "taskID", taskID, zap.Inline(p)) if p.HasTraceCarrier() { if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { ctx = trace.ContextWithRemoteSpanContext(ctx, sc) } + span := trace.SpanFromContext(ctx) + if span.IsRecording() { + span.SetAttributes(attribute.String("taskID", t.ResultWriter().TaskID())) + span.SetAttributes(p.Attributes()...) + } } success, err := ih.indexer.TipSet(ctx, p.TipSet, indexer.WithTasks(p.Tasks)) if err != nil { if strings.Contains(err.Error(), blockstore.ErrNotFound.Error()) { - log.Errorw("failed to index tipset", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String(), "error", err) + log.Errorw("failed to index tipset", zap.Inline(p), "error", err) // return SkipRetry to prevent the task from being retried since nodes do not contain the block - // TODO: later, reschedule task in "backfill" queue with lily nodes capable of syncing the required data. - return fmt.Errorf("indexing tipset.(height) %s.(%d): Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), err, asynq.SkipRetry) + return fmt.Errorf("indexing tipset %s.(%d) taskID %s: Error %s : %w", p.TipSet.Key().String(), p.TipSet.Height(), taskID, err, asynq.SkipRetry) } return err } if !success { - log.Errorw("failed to index tipset successfully", "height", p.TipSet.Height(), "tipset", p.TipSet.Key().String()) - return fmt.Errorf("indexing tipset.(height) %s.(%d)", p.TipSet.Key().String(), p.TipSet.Height()) + log.Errorw("failed to index task successfully", "taskID", taskID, zap.Inline(p)) + return fmt.Errorf("indexing tipset.(height) %s.(%d) taskID: %s", p.TipSet.Key(), p.TipSet.Height(), taskID) } + log.Infow("index tipset success", "taskID", taskID, zap.Inline(p)) return nil } diff --git a/chain/indexer/distributed/queue/worker.go b/chain/indexer/distributed/queue/worker.go index 87022dc8f..474cf68d6 100644 --- a/chain/indexer/distributed/queue/worker.go +++ b/chain/indexer/distributed/queue/worker.go @@ -6,11 +6,15 @@ import ( "github.com/hibiken/asynq" logging "github.com/ipfs/go-log/v2" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" "github.com/filecoin-project/lily/chain/indexer" "github.com/filecoin-project/lily/chain/indexer/distributed" "github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks" + "github.com/filecoin-project/lily/metrics" "github.com/filecoin-project/lily/storage" ) @@ -45,6 +49,15 @@ func (t *AsynqWorker) Run(ctx context.Context) error { t.server.ServerConfig.Logger = log.With("name", t.name) t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{} + stats.Record(ctx, metrics.TipSetWorkerConcurrency.M(int64(t.server.ServerConfig.Concurrency))) + for queueName, priority := range t.server.ServerConfig.Queues { + if err := stats.RecordWithTags(ctx, + []tag.Mutator{tag.Upsert(metrics.QueueName, queueName)}, + metrics.TipSetWorkerQueuePriority.M(int64(priority))); err != nil { + return err + } + } + server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig) if err := server.Start(mux); err != nil { return err @@ -66,6 +79,7 @@ func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, var p tasks.IndexTipSetPayload if err := json.Unmarshal(task.Payload(), &p); err != nil { log.Errorw("failed to decode task type (developer error?)", "error", err) + return } if p.HasTraceCarrier() { if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { @@ -73,11 +87,12 @@ func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, trace.SpanFromContext(ctx).RecordError(err) } } - log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) + log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err) case tasks.TypeGapFillTipSet: var p tasks.GapFillTipSetPayload if err := json.Unmarshal(task.Payload(), &p); err != nil { log.Errorw("failed to decode task type (developer error?)", "error", err) + return } if p.HasTraceCarrier() { if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() { @@ -85,6 +100,6 @@ func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, trace.SpanFromContext(ctx).RecordError(err) } } - log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err) + log.Errorw("task failed", zap.Inline(p), "type", task.Type(), "error", err) } } diff --git a/commands/daemon.go b/commands/daemon.go index 2d20de348..a6546fddf 100644 --- a/commands/daemon.go +++ b/commands/daemon.go @@ -20,6 +20,8 @@ import ( "github.com/mitchellh/go-homedir" "github.com/multiformats/go-multiaddr" "github.com/urfave/cli/v2" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "github.com/filecoin-project/lily/chain/indexer/distributed" "github.com/filecoin-project/lily/commands/util" @@ -27,8 +29,10 @@ import ( "github.com/filecoin-project/lily/lens/lily" "github.com/filecoin-project/lily/lens/lily/modules" lutil "github.com/filecoin-project/lily/lens/util" + "github.com/filecoin-project/lily/metrics" "github.com/filecoin-project/lily/schedule" "github.com/filecoin-project/lily/storage" + "github.com/filecoin-project/lily/version" ) var ClientAPIFlags struct { @@ -205,6 +209,11 @@ Note that jobs are not persisted between restarts of the daemon. See } } + ctx, _ = tag.New(ctx, + tag.Insert(metrics.Version, version.String()), + ) + stats.Record(ctx, metrics.LilyInfo.M(1)) + if err := config.EnsureExists(daemonFlags.config); err != nil { return fmt.Errorf("ensuring config is present at %q: %w", daemonFlags.config, err) } diff --git a/commands/setup.go b/commands/setup.go index 47df3ffb0..c9f7cae37 100644 --- a/commands/setup.go +++ b/commands/setup.go @@ -94,17 +94,18 @@ func setupMetrics(flags LilyMetricOpts) error { return err } - inspector := asynq.NewInspector(asynq.RedisClientOpt{ - Addr: flags.RedisAddr, - DB: flags.RedisDB, - Password: flags.RedisPassword, - Username: flags.RedisUsername, - }) - registry.MustRegister( - goCollector, - procCollector, - asynqmetrics.NewQueueMetricsCollector(inspector), - ) + metricCollectors := []prom.Collector{goCollector, procCollector} + if flags.RedisNetwork != "" { + inspector := asynq.NewInspector(asynq.RedisClientOpt{ + Addr: flags.RedisAddr, + DB: flags.RedisDB, + Password: flags.RedisPassword, + Username: flags.RedisUsername, + }) + metricCollectors = append(metricCollectors, asynqmetrics.NewQueueMetricsCollector(inspector)) + } + registry.MustRegister(metricCollectors...) + // register prometheus with opencensus view.RegisterExporter(pe) view.SetReportingPeriod(2 * time.Second) @@ -139,6 +140,7 @@ func setupMetrics(flags LilyMetricOpts) error { mux.Handle("/debug/pprof/heap", pprof.Handler("heap")) mux.Handle("/debug/pprof/mutex", pprof.Handler("mutex")) mux.Handle("/debug/pprof/threadcreate", pprof.Handler("threadcreate")) + log.Infof("serving metrics on %s", flags.PrometheusPort) if err := http.ListenAndServe(flags.PrometheusPort, mux); err != nil { log.Fatalf("Failed to run Prometheus /metrics endpoint: %v", err) } diff --git a/metrics/metrics.go b/metrics/metrics.go index d0f927d96..17626533c 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -12,29 +12,39 @@ import ( var defaultMillisecondsDistribution = view.Distribution(0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1, 2, 3, 4, 5, 6, 8, 10, 13, 16, 20, 25, 30, 40, 50, 65, 80, 100, 130, 160, 200, 250, 300, 400, 500, 650, 800, 1000, 2000, 5000, 10000, 20000, 30000, 50000, 100000, 200000, 500000, 1000000, 2000000, 5000000, 10000000, 10000000) var ( - TaskType, _ = tag.NewKey("task") // name of task processor - Job, _ = tag.NewKey("job") // name of job - Name, _ = tag.NewKey("name") // name of running instance of visor - Table, _ = tag.NewKey("table") // name of table data is persisted for + Version, _ = tag.NewKey("version") + TaskType, _ = tag.NewKey("task") // name of task processor + Job, _ = tag.NewKey("job") // name of job + JobType, _ = tag.NewKey("job_type") // type of job (walk, watch, fill, find, watch-notify, walk-notify, etc.) + Name, _ = tag.NewKey("name") // name of running instance of visor + Table, _ = tag.NewKey("table") // name of table data is persisted for ConnState, _ = tag.NewKey("conn_state") API, _ = tag.NewKey("api") // name of method on lotus api ActorCode, _ = tag.NewKey("actor_code") // human readable code of actor being processed + // distributed tipset worker + QueueName = tag.MustNewKey("queue") ) var ( + // Common State + + LilyInfo = stats.Int64("lily_info", "Arbitrary counter to tag lily info to", stats.UnitDimensionless) + + // Indexer State + ProcessingDuration = stats.Float64("processing_duration_ms", "Time taken to process a task", stats.UnitMilliseconds) StateExtractionDuration = stats.Float64("state_extraction_duration_ms", "Time taken to extract an actor state", stats.UnitMilliseconds) PersistDuration = stats.Float64("persist_duration_ms", "Duration of a models persist operation", stats.UnitMilliseconds) PersistModel = stats.Int64("persist_model", "Number of models persisted", stats.UnitDimensionless) DBConns = stats.Int64("db_conns", "Database connections held", stats.UnitDimensionless) - LensRequestDuration = stats.Float64("lens_request_duration_ms", "Duration of lotus api requets", stats.UnitMilliseconds) TipsetHeight = stats.Int64("tipset_height", "The height of the tipset being processed by a task", stats.UnitDimensionless) ProcessingFailure = stats.Int64("processing_failure", "Number of processing failures", stats.UnitDimensionless) PersistFailure = stats.Int64("persist_failure", "Number of persistence failures", stats.UnitDimensionless) WatchHeight = stats.Int64("watch_height", "The height of the tipset last seen by the watch command", stats.UnitDimensionless) TipSetSkip = stats.Int64("tipset_skip", "Number of tipsets that were not processed. This is is an indication that lily cannot keep up with chain.", stats.UnitDimensionless) JobStart = stats.Int64("job_start", "Number of jobs started", stats.UnitDimensionless) + JobRunning = stats.Int64("job_running", "Numer of jobs currently running", stats.UnitDimensionless) JobComplete = stats.Int64("job_complete", "Number of jobs completed without error", stats.UnitDimensionless) JobError = stats.Int64("job_error", "Number of jobs stopped due to a fatal error", stats.UnitDimensionless) JobTimeout = stats.Int64("job_timeout", "Number of jobs stopped due to taking longer than expected", stats.UnitDimensionless) @@ -44,6 +54,8 @@ var ( WatcherActiveWorkers = stats.Int64("watcher_active_workers", "Current number of tipset indexers executing", stats.UnitDimensionless) WatcherWaitingWorkers = stats.Int64("watcher_waiting_workers", "Current number of tipset indexers waiting to execute", stats.UnitDimensionless) + // DataSource API + DataSourceSectorDiffCacheHit = stats.Int64("data_source_sector_diff_cache_hit", "Number of cache hits for sector diff", stats.UnitDimensionless) DataSourceSectorDiffRead = stats.Int64("data_source_sector_diff_read", "Number of reads for sector diff", stats.UnitDimensionless) DataSourcePreCommitDiffCacheHit = stats.Int64("data_source_precommit_diff_cache_hit", "Number of cache hits for precommit diff", stats.UnitDimensionless) @@ -54,9 +66,29 @@ var ( DataSourceExecutedAndBlockMessagesCacheHit = stats.Int64("data_source_executed_block_messages_cache_hig", "Number of cache hits for executed block messages", stats.UnitDimensionless) DataSourceActorStateChangesFastDiff = stats.Int64("data_source_actor_state_changes_fast_diff", "Number of fast diff operations performed for actor state changes", stats.UnitDimensionless) DataSourceActorStateChangesSlowDiff = stats.Int64("data_source_actor_state_changes_slow_diff", "Number of slow diff operations performed for actor state changes", stats.UnitDimensionless) + + // Distributed Indexer + + TipSetWorkerConcurrency = stats.Int64("tipset_worker_concurrency", "Concurrency of tipset worker", stats.UnitDimensionless) + TipSetWorkerQueuePriority = stats.Int64("tipset_worker_queue_priority", "Priority of tipset worker queue", stats.UnitDimensionless) ) var DefaultViews = []*view.View{ + { + Measure: LilyInfo, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{Version}, + }, + { + Measure: TipSetWorkerConcurrency, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{}, + }, + { + Measure: TipSetWorkerQueuePriority, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{QueueName}, + }, { Measure: DataSourceActorStateChangesFastDiff, Aggregation: view.Count(), @@ -127,17 +159,6 @@ var DefaultViews = []*view.View{ Aggregation: view.Count(), TagKeys: []tag.Key{ConnState}, }, - { - Measure: LensRequestDuration, - Aggregation: defaultMillisecondsDistribution, - TagKeys: []tag.Key{TaskType, API, ActorCode}, - }, - { - Name: "lens_request_total", - Measure: LensRequestDuration, - Aggregation: view.Count(), - TagKeys: []tag.Key{TaskType, API, ActorCode}, - }, { Measure: TipsetHeight, Aggregation: view.LastValue(), @@ -166,30 +187,34 @@ var DefaultViews = []*view.View{ Aggregation: view.Sum(), TagKeys: []tag.Key{Job}, }, - + { + Measure: JobRunning, + Aggregation: view.Sum(), + TagKeys: []tag.Key{Job, JobType}, + }, { Name: JobStart.Name() + "_total", Measure: JobStart, Aggregation: view.Count(), - TagKeys: []tag.Key{Job}, + TagKeys: []tag.Key{Job, JobType}, }, { Name: JobComplete.Name() + "_total", Measure: JobComplete, Aggregation: view.Count(), - TagKeys: []tag.Key{Job}, + TagKeys: []tag.Key{Job, JobType}, }, { Name: JobError.Name() + "_total", Measure: JobError, Aggregation: view.Count(), - TagKeys: []tag.Key{Job}, + TagKeys: []tag.Key{Job, JobType}, }, { Name: JobTimeout.Name() + "_total", Measure: JobTimeout, Aggregation: view.Count(), - TagKeys: []tag.Key{Job}, + TagKeys: []tag.Key{Job, JobType}, }, { diff --git a/schedule/scheduler.go b/schedule/scheduler.go index 351439636..57d1234a4 100644 --- a/schedule/scheduler.go +++ b/schedule/scheduler.go @@ -388,6 +388,7 @@ func (s *Scheduler) Jobs() []JobListResult { func (s *Scheduler) execute(jc *JobConfig, complete chan struct{}) { ctx, cancel := context.WithCancel(s.context) ctx = metrics.WithTagValue(ctx, metrics.Job, jc.Name) + ctx = metrics.WithTagValue(ctx, metrics.JobType, jc.Type) jc.lk.Lock() jc.cancel = cancel @@ -452,7 +453,9 @@ func (s *Scheduler) execute(jc *JobConfig, complete chan struct{}) { } metrics.RecordInc(ctx, metrics.JobStart) + metrics.RecordInc(ctx, metrics.JobRunning) err := jc.Job.Run(ctx) + metrics.RecordDec(ctx, metrics.JobRunning) if err != nil { if errors.Is(err, context.Canceled) { break