Skip to content

Commit

Permalink
Merge pull request #13 from lyft/more-metrics
Browse files Browse the repository at this point in the history
Metrics captured for offending workflows
  • Loading branch information
Ketan Umare authored Sep 18, 2019
2 parents b16160c + c20ec36 commit 6fcd80b
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 21 deletions.
30 changes: 16 additions & 14 deletions flytepropeller/pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"runtime/debug"
"time"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytepropeller/pkg/controller/config"
"github.com/lyft/flytepropeller/pkg/controller/workflowstore"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -22,10 +24,10 @@ import (
type propellerMetrics struct {
Scope promutils.Scope
DeepCopyTime promutils.StopWatch
RawWorkflowTraversalTime promutils.StopWatch
SystemError prometheus.Counter
AbortError prometheus.Counter
PanicObserved prometheus.Counter
RawWorkflowTraversalTime labeled.StopWatch
SystemError labeled.Counter
AbortError labeled.Counter
PanicObserved labeled.Counter
RoundSkipped prometheus.Counter
WorkflowNotFound prometheus.Counter
}
Expand All @@ -35,10 +37,10 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics {
return &propellerMetrics{
Scope: scope,
DeepCopyTime: roundScope.MustNewStopWatch("deepcopy", "Total time to deep copy wf object", time.Millisecond),
RawWorkflowTraversalTime: roundScope.MustNewStopWatch("raw", "Total time to traverse the workflow", time.Millisecond),
SystemError: roundScope.MustNewCounter("system_error", "Failure to reconcile a workflow, system error"),
AbortError: roundScope.MustNewCounter("abort_error", "Failure to abort a workflow, system error"),
PanicObserved: roundScope.MustNewCounter("panic", "Panic during handling or aborting workflow"),
RawWorkflowTraversalTime: labeled.NewStopWatch("raw", "Total time to traverse the workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric),
SystemError: labeled.NewCounter("system_error", "Failure to reconcile a workflow, system error", roundScope, labeled.EmitUnlabeledMetric),
AbortError: labeled.NewCounter("abort_error", "Failure to abort a workflow, system error", roundScope, labeled.EmitUnlabeledMetric),
PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric),
RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"),
WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"),
}
Expand Down Expand Up @@ -111,13 +113,13 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack))
p.metrics.PanicObserved.Inc()
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleAbortedWorkflow(ctx, wfDeepCopy, maxRetries)
}()
if err != nil {
p.metrics.AbortError.Inc()
p.metrics.AbortError.Inc(ctx)
return err
}
} else {
Expand All @@ -133,13 +135,13 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
SetFinalizerIfEmpty(wfDeepCopy, FinalizerKey)

func() {
t := p.metrics.RawWorkflowTraversalTime.Start()
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
defer func() {
t.Stop()
if r := recover(); r != nil {
stack := debug.Stack()
err = fmt.Errorf("panic when aborting workflow, Stack: [%s]", string(stack))
p.metrics.PanicObserved.Inc()
p.metrics.PanicObserved.Inc(ctx)
}
}()
err = p.workflowExecutor.HandleFlyteWorkflow(ctx, wfDeepCopy)
Expand All @@ -152,7 +154,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error {
wfDeepCopy = w.DeepCopy()
wfDeepCopy.GetExecutionStatus().IncFailedAttempts()
wfDeepCopy.GetExecutionStatus().SetMessage(err.Error())
p.metrics.SystemError.Inc()
p.metrics.SystemError.Inc(ctx)
} else {
// No updates in the status we detected, we will skip writing to KubeAPI
if wfDeepCopy.Status.Equals(&w.Status) {
Expand Down
22 changes: 15 additions & 7 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ type nodeMetrics struct {
TransitionLatency labeled.StopWatch
// Measures the latency between the time a node's been queued to the time the handler reported the executable moved
// to running state
QueuingLatency labeled.StopWatch
QueuingLatency labeled.StopWatch
NodeExecutionTime labeled.StopWatch
NodeInputGatherLatency labeled.StopWatch
}

type nodeExecutor struct {
Expand Down Expand Up @@ -103,6 +105,8 @@ func (c *nodeExecutor) startNode(ctx context.Context, w v1alpha1.ExecutableWorkf
dataDir := nodeStatus.GetDataDir()
var nodeInputs *handler.Data
if !node.IsStartNode() {
t := c.metrics.NodeInputGatherLatency.Start(ctx)
defer t.Stop()
// Can execute
var err error
nodeInputs, err = Resolve(ctx, c.nodeHandlerFactory, w, node.GetID(), node.GetInputBindings(), c.store)
Expand Down Expand Up @@ -455,6 +459,8 @@ func (c *nodeExecutor) RecursiveNodeHandler(ctx context.Context, w v1alpha1.Exec
switch nodeStatus.GetPhase() {
case v1alpha1.NodePhaseNotYetStarted, v1alpha1.NodePhaseQueued, v1alpha1.NodePhaseRunning, v1alpha1.NodePhaseFailing, v1alpha1.NodePhaseRetryableFailure, v1alpha1.NodePhaseSucceeding:
logger.Debugf(currentNodeCtx, "Handling node Status [%v]", nodeStatus.GetPhase().String())
t := c.metrics.NodeExecutionTime.Start(ctx)
defer t.Stop()
return c.executeNode(currentNodeCtx, w, currentNode)
// TODO we can optimize skip state handling by iterating down the graph and marking all as skipped
// Currently we treat either Skip or Success the same way. In this approach only one node will be skipped
Expand Down Expand Up @@ -515,12 +521,14 @@ func NewExecutor(ctx context.Context, store *storage.DataStore, enQWorkflow v1al
enqueueWorkflow: enQWorkflow,
nodeRecorder: events.NewNodeEventRecorder(eventSink, nodeScope),
metrics: &nodeMetrics{
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope),
ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope),
TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
FailureDuration: labeled.NewStopWatch("failure_duration", "Indicates the total execution time of a failed workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
SuccessDuration: labeled.NewStopWatch("success_duration", "Indicates the total execution time of a successful workflow.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
InputsWriteFailure: labeled.NewCounter("inputs_write_fail", "Indicates failure in writing node inputs to metastore", nodeScope),
ResolutionFailure: labeled.NewCounter("input_resolve_fail", "Indicates failure in resolving node inputs", nodeScope),
TransitionLatency: labeled.NewStopWatch("transition_latency", "Measures the latency between the last parent node stoppedAt time and current node's queued time.", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
QueuingLatency: labeled.NewStopWatch("queueing_latency", "Measures the latency between the time a node's been queued to the time the handler reported the executable moved to running state", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeExecutionTime: labeled.NewStopWatch("node_exec_latency", "Measures the time taken to execute one node, a node can be complex so it may encompass sub-node latency.", time.Microsecond, nodeScope, labeled.EmitUnlabeledMetric),
NodeInputGatherLatency: labeled.NewStopWatch("node_input_latency", "Measures the latency to aggregate inputs and check readiness of a node", time.Millisecond, nodeScope, labeled.EmitUnlabeledMetric),
},
}
nodeHandlerFactory, err := NewHandlerFactory(
Expand Down
6 changes: 6 additions & 0 deletions flytepropeller/pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type metrics struct {
discoveryGetFailureCount labeled.Counter
discoveryMissCount labeled.Counter
discoveryHitCount labeled.Counter
pluginExecutionLatency labeled.StopWatch

// TODO We should have a metric to capture custom state size
}
Expand Down Expand Up @@ -255,7 +256,9 @@ func (h *taskHandler) StartNode(ctx context.Context, w v1alpha1.ExecutableWorkfl
logger.Errorf(ctx, "Panic in plugin for TaskType [%s]", task.TaskType())
}
}()
t := h.metrics.pluginExecutionLatency.Start(ctx)
taskStatus, err = t.StartTask(ctx, taskCtx, task.CoreTask(), nodeInputs)
t.Stop()
}()

if err != nil {
Expand Down Expand Up @@ -306,7 +309,9 @@ func (h *taskHandler) CheckNodeStatus(ctx context.Context, w v1alpha1.Executable
logger.Errorf(ctx, "Panic in plugin for TaskType [%s]", task.TaskType())
}
}()
t := h.metrics.pluginExecutionLatency.Start(ctx)
taskStatus, err = t.CheckTaskStatus(ctx, taskCtx, task.CoreTask())
t.Stop()
}()

if err != nil {
Expand Down Expand Up @@ -428,6 +433,7 @@ func NewTaskHandlerForFactory(eventSink events.EventSink, store *storage.DataSto
discoveryMissCount: labeled.NewCounter("discovery_miss_count", "Task not cached in Discovery", scope),
discoveryPutFailureCount: labeled.NewCounter("discovery_put_failure_count", "Discovery Put failure count", scope),
discoveryGetFailureCount: labeled.NewCounter("discovery_get_failure_count", "Discovery Get faillure count", scope),
pluginExecutionLatency: labeled.NewStopWatch("plugin_exec_latecny", "Time taken to invoke plugin for one round", time.Microsecond, scope),
},
}
}
Expand Down

0 comments on commit 6fcd80b

Please sign in to comment.