From 22d28151d4db434cfbf2601870d60eba83306369 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Wed, 1 Sep 2021 19:46:31 -0700 Subject: [PATCH] Make `NewRecorder` a proper singleton `NewRecorder` cannot be called twice safely because it would result in the same view being registered twice, which always returns an error. The tests work around this by themselves manually unregistering the metrics that `NewRecorder` registers, so an alternative here might be to have `NewRecorder` also return a `context.CancelFunc` to unregister things, or to have `NewRecorder` take `ctx` so that it can unregister the metrics on context-cancellation. This change simply opts for making `NewRecorder` a proper singleton with a `sync.Once` and private globals for the `Recorder` and the possible error, so multiple calls to `NewRecorder` return the same singleton `Recorder` (or its error). I've updated the tests to clear this new singleton state (as well as continuing to manually unregistering the metrics). --- pkg/pipelinerunmetrics/metrics.go | 114 +++++++++------- pkg/pipelinerunmetrics/metrics_test.go | 6 + pkg/taskrunmetrics/metrics.go | 179 +++++++++++++------------ pkg/taskrunmetrics/metrics_test.go | 6 + 4 files changed, 171 insertions(+), 134 deletions(-) diff --git a/pkg/pipelinerunmetrics/metrics.go b/pkg/pipelinerunmetrics/metrics.go index 2de49d6eb52..705fc564139 100644 --- a/pkg/pipelinerunmetrics/metrics.go +++ b/pkg/pipelinerunmetrics/metrics.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "sync" "time" "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" @@ -68,66 +69,77 @@ type Recorder struct { ReportingPeriod time.Duration } +// We cannot register the view multiple times, so NewRecorder lazily +// initializes this singleton and returns the same recorder across any +// subsequent invocations. +var ( + once sync.Once + r *Recorder + recorderErr error +) + // NewRecorder creates a new metrics recorder instance // to log the PipelineRun related metrics func NewRecorder() (*Recorder, error) { - r := &Recorder{ - initialized: true, + once.Do(func() { + r = &Recorder{ + initialized: true, - // Default to 30s intervals. - ReportingPeriod: 30 * time.Second, - } - - pipeline, err := tag.NewKey("pipeline") - if err != nil { - return nil, err - } - r.pipeline = pipeline + // Default to 30s intervals. + ReportingPeriod: 30 * time.Second, + } - pipelineRun, err := tag.NewKey("pipelinerun") - if err != nil { - return nil, err - } - r.pipelineRun = pipelineRun + pipeline, recorderErr := tag.NewKey("pipeline") + if recorderErr != nil { + return + } + r.pipeline = pipeline - namespace, err := tag.NewKey("namespace") - if err != nil { - return nil, err - } - r.namespace = namespace + pipelineRun, recorderErr := tag.NewKey("pipelinerun") + if recorderErr != nil { + return + } + r.pipelineRun = pipelineRun - status, err := tag.NewKey("status") - if err != nil { - return nil, err - } - r.status = status - - err = view.Register( - &view.View{ - Description: prDuration.Description(), - Measure: prDuration, - Aggregation: prDistributions, - TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status}, - }, - &view.View{ - Description: prCount.Description(), - Measure: prCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{r.status}, - }, - &view.View{ - Description: runningPRsCount.Description(), - Measure: runningPRsCount, - Aggregation: view.LastValue(), - }, - ) + namespace, recorderErr := tag.NewKey("namespace") + if recorderErr != nil { + return + } + r.namespace = namespace - if err != nil { - r.initialized = false - return r, err - } + status, recorderErr := tag.NewKey("status") + if recorderErr != nil { + return + } + r.status = status + + recorderErr = view.Register( + &view.View{ + Description: prDuration.Description(), + Measure: prDuration, + Aggregation: prDistributions, + TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status}, + }, + &view.View{ + Description: prCount.Description(), + Measure: prCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningPRsCount.Description(), + Measure: runningPRsCount, + Aggregation: view.LastValue(), + }, + ) + + if recorderErr != nil { + r.initialized = false + return + } + }) - return r, nil + return r, recorderErr } // DurationAndCount logs the duration of PipelineRun execution and diff --git a/pkg/pipelinerunmetrics/metrics_test.go b/pkg/pipelinerunmetrics/metrics_test.go index 1723ed50529..13436ccc89e 100644 --- a/pkg/pipelinerunmetrics/metrics_test.go +++ b/pkg/pipelinerunmetrics/metrics_test.go @@ -17,6 +17,7 @@ limitations under the License. package pipelinerunmetrics import ( + "sync" "testing" "time" @@ -213,4 +214,9 @@ func TestRecordRunningPipelineRunsCount(t *testing.T) { func unregisterMetrics() { metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count") + + // Allow the recorder singleton to be recreated. + once = sync.Once{} + r = nil + recorderErr = nil } diff --git a/pkg/taskrunmetrics/metrics.go b/pkg/taskrunmetrics/metrics.go index 42cb906b1f7..36b2fb33dde 100644 --- a/pkg/taskrunmetrics/metrics.go +++ b/pkg/taskrunmetrics/metrics.go @@ -19,6 +19,7 @@ package taskrunmetrics import ( "context" "fmt" + "sync" "time" "github.com/pkg/errors" @@ -78,102 +79,114 @@ type Recorder struct { ReportingPeriod time.Duration } +// We cannot register the view multiple times, so NewRecorder lazily +// initializes this singleton and returns the same recorder across any +// subsequent invocations. +var ( + once sync.Once + r *Recorder + recorderErr error +) + // NewRecorder creates a new metrics recorder instance // to log the TaskRun related metrics func NewRecorder() (*Recorder, error) { - r := &Recorder{ - initialized: true, + // Views cannot be registered multiple times, so recorder should be a singleton. + once.Do(func() { + r = &Recorder{ + initialized: true, - // Default to reporting metrics every 30s. - ReportingPeriod: 30 * time.Second, - } + // Default to reporting metrics every 30s. + ReportingPeriod: 30 * time.Second, + } - task, err := tag.NewKey("task") - if err != nil { - return nil, err - } - r.task = task + task, recorderErr := tag.NewKey("task") + if recorderErr != nil { + return + } + r.task = task - taskRun, err := tag.NewKey("taskrun") - if err != nil { - return nil, err - } - r.taskRun = taskRun + taskRun, recorderErr := tag.NewKey("taskrun") + if recorderErr != nil { + return + } + r.taskRun = taskRun - namespace, err := tag.NewKey("namespace") - if err != nil { - return nil, err - } - r.namespace = namespace + namespace, recorderErr := tag.NewKey("namespace") + if recorderErr != nil { + return + } + r.namespace = namespace - status, err := tag.NewKey("status") - if err != nil { - return nil, err - } - r.status = status + status, recorderErr := tag.NewKey("status") + if recorderErr != nil { + return + } + r.status = status - pipeline, err := tag.NewKey("pipeline") - if err != nil { - return nil, err - } - r.pipeline = pipeline + pipeline, recorderErr := tag.NewKey("pipeline") + if recorderErr != nil { + return + } + r.pipeline = pipeline - pipelineRun, err := tag.NewKey("pipelinerun") - if err != nil { - return nil, err - } - r.pipelineRun = pipelineRun + pipelineRun, recorderErr := tag.NewKey("pipelinerun") + if recorderErr != nil { + return + } + r.pipelineRun = pipelineRun - pod, err := tag.NewKey("pod") - if err != nil { - return nil, err - } - r.pod = pod - - err = view.Register( - &view.View{ - Description: trDuration.Description(), - Measure: trDuration, - Aggregation: trDistribution, - TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status}, - }, - &view.View{ - Description: prTRDuration.Description(), - Measure: prTRDuration, - Aggregation: prTRLatencyDistribution, - TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, - }, - &view.View{ - Description: trCount.Description(), - Measure: trCount, - Aggregation: view.Count(), - TagKeys: []tag.Key{r.status}, - }, - &view.View{ - Description: runningTRsCount.Description(), - Measure: runningTRsCount, - Aggregation: view.LastValue(), - }, - &view.View{ - Description: podLatency.Description(), - Measure: podLatency, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod}, - }, - &view.View{ - Description: cloudEvents.Description(), - Measure: cloudEvents, - Aggregation: view.Sum(), - TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, - }, - ) + pod, recorderErr := tag.NewKey("pod") + if recorderErr != nil { + return + } + r.pod = pod + + recorderErr = view.Register( + &view.View{ + Description: trDuration.Description(), + Measure: trDuration, + Aggregation: trDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status}, + }, + &view.View{ + Description: prTRDuration.Description(), + Measure: prTRDuration, + Aggregation: prTRLatencyDistribution, + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, + }, + &view.View{ + Description: trCount.Description(), + Measure: trCount, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.status}, + }, + &view.View{ + Description: runningTRsCount.Description(), + Measure: runningTRsCount, + Aggregation: view.LastValue(), + }, + &view.View{ + Description: podLatency.Description(), + Measure: podLatency, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod}, + }, + &view.View{ + Description: cloudEvents.Description(), + Measure: cloudEvents, + Aggregation: view.Sum(), + TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun}, + }, + ) - if err != nil { - r.initialized = false - return r, err - } + if recorderErr != nil { + r.initialized = false + return + } + }) - return r, nil + return r, recorderErr } // DurationAndCount logs the duration of TaskRun execution and diff --git a/pkg/taskrunmetrics/metrics_test.go b/pkg/taskrunmetrics/metrics_test.go index ea129c3aecf..492cae870e3 100644 --- a/pkg/taskrunmetrics/metrics_test.go +++ b/pkg/taskrunmetrics/metrics_test.go @@ -17,6 +17,7 @@ limitations under the License. package taskrunmetrics import ( + "sync" "testing" "time" @@ -453,4 +454,9 @@ func TestRecordCloudEvents(t *testing.T) { func unregisterMetrics() { metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "taskruns_pod_latency", "cloudevent_count") + + // Allow the recorder singleton to be recreated. + once = sync.Once{} + r = nil + recorderErr = nil }