Skip to content

Commit

Permalink
feat(api): expose started and failed workflow runs metrics (#3373)
Browse files Browse the repository at this point in the history
  • Loading branch information
fsamin authored and richardlt committed Sep 28, 2018
1 parent fcfb482 commit b758517
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 13 deletions.
24 changes: 17 additions & 7 deletions engine/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,9 @@ type API struct {
warnChan chan sdk.Event
Cache cache.Store
Stats struct {
WorkflowRuns *stats.Int64Measure
Sessions *stats.Int64Measure
WorkflowRunFailed *stats.Int64Measure
WorkflowRunStarted *stats.Int64Measure
Sessions *stats.Int64Measure
}
}

Expand Down Expand Up @@ -739,16 +740,25 @@ func (a *API) Serve(ctx context.Context) error {
}

func (a *API) initStats() error {
label := fmt.Sprintf("cds/cds-api/%s/workflow_runs", a.Name)
a.Stats.WorkflowRuns = stats.Int64(label, "number of workflow runs", stats.UnitDimensionless)
label := fmt.Sprintf("cds/cds-api/%s/workflow_runs_started", a.Name)
a.Stats.WorkflowRunStarted = stats.Int64(label, "number of started workflow runs", stats.UnitDimensionless)

label = fmt.Sprintf("cds/cds-api/%s/workflow_runs_failed", a.Name)
a.Stats.WorkflowRunFailed = stats.Int64(label, "number of failed workflow runs", stats.UnitDimensionless)

log.Info("api> Stats initialized")

return observability.RegisterView(
&view.View{
Name: "workflow_runs",
Description: a.Stats.WorkflowRuns.Description(),
Measure: a.Stats.WorkflowRuns,
Name: "workflow_runs_started",
Description: a.Stats.WorkflowRunStarted.Description(),
Measure: a.Stats.WorkflowRunStarted,
Aggregation: view.Count(),
},
&view.View{
Name: "workflow_runs_failed",
Description: a.Stats.WorkflowRunFailed.Description(),
Measure: a.Stats.WorkflowRunFailed,
Aggregation: view.Count(),
},
)
Expand Down
3 changes: 3 additions & 0 deletions engine/api/observability/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ func RegisterView(views ...*view.View) error {
// StatsHandler returns a Handler to exposer prometheus views
func StatsHandler() service.Handler {
return func(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
if statsExporter == nil {
return nil
}
statsExporter.ServeHTTP(w, r)
return nil
}
Expand Down
8 changes: 6 additions & 2 deletions engine/api/workflow_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,12 +418,16 @@ func (api *API) postWorkflowJobResultHandler() service.Handler {
return sdk.WrapError(err, "postWorkflowJobResultHandler> unable to post job result")
}

observability.Record(ctx, api.Stats.WorkflowRunStarted, 1)
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, proj.Key)

if len(workflowRuns) > 0 {
observability.Current(ctx,
observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name),
)
observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name))

if workflowRuns[0].Status == sdk.StatusFail.String() {
observability.Record(ctx, api.Stats.WorkflowRunFailed, 1)
}
}

db := api.mustDB()
Expand Down
30 changes: 26 additions & 4 deletions engine/api/workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,17 @@ func (api *API) stopWorkflowRunHandler() service.Handler {
workflowRuns, workflowNodeRuns := workflow.GetWorkflowRunEventData(report, proj.Key)
go workflow.SendEvent(api.mustDB(), workflowRuns, workflowNodeRuns, proj.Key)

if len(workflowRuns) > 0 {
observability.Current(ctx,
observability.Tag(observability.TagProjectKey, proj.Key),
observability.Tag(observability.TagWorkflow, workflowRuns[0].Workflow.Name),
)

if workflowRuns[0].Status == sdk.StatusFail.String() {
observability.Record(ctx, api.Stats.WorkflowRunFailed, 1)
}
}

return service.WriteJSON(w, run, http.StatusOK)
}
}
Expand Down Expand Up @@ -519,7 +530,7 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
return sdk.WrapError(err, "stopWorkflowNodeRunHandler> Unable to load last workflow run")
}

report, err := stopWorkflowNodeRun(ctx, api.mustDB, api.Cache, p, nodeRun, name, getUser(ctx))
report, err := api.stopWorkflowNodeRun(ctx, api.mustDB, api.Cache, p, nodeRun, name, getUser(ctx))
if err != nil {
return sdk.WrapError(err, "stopWorkflowNodeRunHandler> Unable to stop workflow run")
}
Expand All @@ -531,7 +542,7 @@ func (api *API) stopWorkflowNodeRunHandler() service.Handler {
}
}

func stopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, nodeRun *sdk.WorkflowNodeRun, workflowName string, u *sdk.User) (*workflow.ProcessorReport, error) {
func (api *API) stopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store cache.Store, p *sdk.Project, nodeRun *sdk.WorkflowNodeRun, workflowName string, u *sdk.User) (*workflow.ProcessorReport, error) {
tx, errTx := dbFunc().Begin()
if errTx != nil {
return nil, sdk.WrapError(errTx, "stopWorkflowNodeRunHandler> Unable to create transaction")
Expand Down Expand Up @@ -560,6 +571,14 @@ func stopWorkflowNodeRun(ctx context.Context, dbFunc func() *gorp.DbMap, store c

_, _ = report.Merge(r1, nil)

observability.Current(ctx,
observability.Tag(observability.TagProjectKey, p.Key),
observability.Tag(observability.TagWorkflow, wr.Workflow.Name),
)
if wr.Status == sdk.StatusFail.String() {
observability.Record(ctx, api.Stats.WorkflowRunFailed, 1)
}

if errC := tx.Commit(); errC != nil {
return nil, sdk.WrapError(errC, "stopWorkflowNodeRunHandler> Unable to commit")
}
Expand Down Expand Up @@ -597,8 +616,11 @@ func (api *API) postWorkflowRunHandler() service.Handler {
name := vars["permWorkflowName"]
u := getUser(ctx)

observability.Current(ctx, observability.Tag(observability.TagWorkflow, name))
observability.Record(ctx, api.Stats.WorkflowRuns, 1)
observability.Current(ctx,
observability.Tag(observability.TagProjectKey, key),
observability.Tag(observability.TagWorkflow, name),
)
observability.Record(ctx, api.Stats.WorkflowRunStarted, 1)

_, next := observability.Span(ctx, "project.Load")
p, errP := project.Load(api.mustDB(), api.Cache, key, u,
Expand Down

0 comments on commit b758517

Please sign in to comment.