Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make NewRecorder a proper singleton #4210

Merged
merged 1 commit into from
Sep 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 63 additions & 51 deletions pkg/pipelinerunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
Expand Down Expand Up @@ -68,66 +69,77 @@ type Recorder struct {
ReportingPeriod time.Duration
}

// We cannot register the view multiple times, so NewRecorder lazily
// initializes this singleton and returns the same recorder across any
// subsequent invocations.
var (
once sync.Once
r *Recorder
recorderErr error
)

// NewRecorder creates a new metrics recorder instance
// to log the PipelineRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
once.Do(func() {
r = &Recorder{
initialized: true,

// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
}

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline
// Default to 30s intervals.
ReportingPeriod: 30 * time.Second,
}

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun
pipeline, recorderErr := tag.NewKey("pipeline")
if recorderErr != nil {
return
}
r.pipeline = pipeline

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace
pipelineRun, recorderErr := tag.NewKey("pipelinerun")
if recorderErr != nil {
return
}
r.pipelineRun = pipelineRun

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status

err = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: prDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPRsCount.Description(),
Measure: runningPRsCount,
Aggregation: view.LastValue(),
},
)
namespace, recorderErr := tag.NewKey("namespace")
if recorderErr != nil {
return
}
r.namespace = namespace

if err != nil {
r.initialized = false
return r, err
}
status, recorderErr := tag.NewKey("status")
if recorderErr != nil {
return
}
r.status = status

recorderErr = view.Register(
&view.View{
Description: prDuration.Description(),
Measure: prDuration,
Aggregation: prDistributions,
TagKeys: []tag.Key{r.pipeline, r.pipelineRun, r.namespace, r.status},
},
&view.View{
Description: prCount.Description(),
Measure: prCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningPRsCount.Description(),
Measure: runningPRsCount,
Aggregation: view.LastValue(),
},
)

if recorderErr != nil {
r.initialized = false
return
}
})

return r, nil
return r, recorderErr
}

// DurationAndCount logs the duration of PipelineRun execution and
Expand Down
6 changes: 6 additions & 0 deletions pkg/pipelinerunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package pipelinerunmetrics

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -213,4 +214,9 @@ func TestRecordRunningPipelineRunsCount(t *testing.T) {

func unregisterMetrics() {
metricstest.Unregister("pipelinerun_duration_seconds", "pipelinerun_count", "running_pipelineruns_count")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
r = nil
recorderErr = nil
}
179 changes: 96 additions & 83 deletions pkg/taskrunmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package taskrunmetrics
import (
"context"
"fmt"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -78,102 +79,114 @@ type Recorder struct {
ReportingPeriod time.Duration
}

// We cannot register the view multiple times, so NewRecorder lazily
// initializes this singleton and returns the same recorder across any
// subsequent invocations.
var (
once sync.Once
r *Recorder
recorderErr error
)

// NewRecorder creates a new metrics recorder instance
// to log the TaskRun related metrics
func NewRecorder() (*Recorder, error) {
r := &Recorder{
initialized: true,
// Views cannot be registered multiple times, so recorder should be a singleton.
once.Do(func() {
r = &Recorder{
initialized: true,

// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}
// Default to reporting metrics every 30s.
ReportingPeriod: 30 * time.Second,
}

task, err := tag.NewKey("task")
if err != nil {
return nil, err
}
r.task = task
task, recorderErr := tag.NewKey("task")
if recorderErr != nil {
return
}
r.task = task

taskRun, err := tag.NewKey("taskrun")
if err != nil {
return nil, err
}
r.taskRun = taskRun
taskRun, recorderErr := tag.NewKey("taskrun")
if recorderErr != nil {
return
}
r.taskRun = taskRun

namespace, err := tag.NewKey("namespace")
if err != nil {
return nil, err
}
r.namespace = namespace
namespace, recorderErr := tag.NewKey("namespace")
if recorderErr != nil {
return
}
r.namespace = namespace

status, err := tag.NewKey("status")
if err != nil {
return nil, err
}
r.status = status
status, recorderErr := tag.NewKey("status")
if recorderErr != nil {
return
}
r.status = status

pipeline, err := tag.NewKey("pipeline")
if err != nil {
return nil, err
}
r.pipeline = pipeline
pipeline, recorderErr := tag.NewKey("pipeline")
if recorderErr != nil {
return
}
r.pipeline = pipeline

pipelineRun, err := tag.NewKey("pipelinerun")
if err != nil {
return nil, err
}
r.pipelineRun = pipelineRun
pipelineRun, recorderErr := tag.NewKey("pipelinerun")
if recorderErr != nil {
return
}
r.pipelineRun = pipelineRun

pod, err := tag.NewKey("pod")
if err != nil {
return nil, err
}
r.pod = pod

err = view.Register(
&view.View{
Description: trDuration.Description(),
Measure: trDuration,
Aggregation: trDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status},
},
&view.View{
Description: prTRDuration.Description(),
Measure: prTRDuration,
Aggregation: prTRLatencyDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
&view.View{
Description: trCount.Description(),
Measure: trCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningTRsCount.Description(),
Measure: runningTRsCount,
Aggregation: view.LastValue(),
},
&view.View{
Description: podLatency.Description(),
Measure: podLatency,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod},
},
&view.View{
Description: cloudEvents.Description(),
Measure: cloudEvents,
Aggregation: view.Sum(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
)
pod, recorderErr := tag.NewKey("pod")
if recorderErr != nil {
return
}
r.pod = pod

recorderErr = view.Register(
&view.View{
Description: trDuration.Description(),
Measure: trDuration,
Aggregation: trDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status},
},
&view.View{
Description: prTRDuration.Description(),
Measure: prTRDuration,
Aggregation: prTRLatencyDistribution,
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
&view.View{
Description: trCount.Description(),
Measure: trCount,
Aggregation: view.Count(),
TagKeys: []tag.Key{r.status},
},
&view.View{
Description: runningTRsCount.Description(),
Measure: runningTRsCount,
Aggregation: view.LastValue(),
},
&view.View{
Description: podLatency.Description(),
Measure: podLatency,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.pod},
},
&view.View{
Description: cloudEvents.Description(),
Measure: cloudEvents,
Aggregation: view.Sum(),
TagKeys: []tag.Key{r.task, r.taskRun, r.namespace, r.status, r.pipeline, r.pipelineRun},
},
)

if err != nil {
r.initialized = false
return r, err
}
if recorderErr != nil {
r.initialized = false
return
}
})

return r, nil
return r, recorderErr
}

// DurationAndCount logs the duration of TaskRun execution and
Expand Down
6 changes: 6 additions & 0 deletions pkg/taskrunmetrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package taskrunmetrics

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -453,4 +454,9 @@ func TestRecordCloudEvents(t *testing.T) {

func unregisterMetrics() {
metricstest.Unregister("taskrun_duration_seconds", "pipelinerun_taskrun_duration_seconds", "taskrun_count", "running_taskruns_count", "taskruns_pod_latency", "cloudevent_count")

// Allow the recorder singleton to be recreated.
once = sync.Once{}
r = nil
recorderErr = nil
}