Skip to content

Commit

Permalink
Merge pull request flyteorg#49 from lyft/trace-token-ages
Browse files Browse the repository at this point in the history
Emit request wait time (duration) as a metric
  • Loading branch information
bnsblue authored Jan 28, 2020
2 parents 682c509 + 092ffeb commit b5a3ed4
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 10 deletions.
2 changes: 1 addition & 1 deletion flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/hashicorp/golang-lru v0.5.4
github.com/json-iterator/go v1.1.9 // indirect
github.com/lyft/flyteidl v0.17.0
github.com/lyft/flytestdlib v0.3.0
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
github.com/pkg/errors v0.9.1
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,8 @@ github.com/lyft/flyteidl v0.17.0 h1:Vsg38PGQAe+A1/9y6buFA5HoZO5vjAT7XNKrWokz084=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI=
github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
Expand Down
3 changes: 1 addition & 2 deletions flyteplugins/go/tasks/pluginmachinery/core/exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ import (

"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
"github.com/lyft/flytestdlib/storage"
)

// An interface to access the TaskInformation
Expand Down
18 changes: 15 additions & 3 deletions flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,18 +61,21 @@ type ExecutionState struct {

// In kicking off the Qubole command, this is the number of failures
CreationFailureCount int `json:"creation_failure_count,omitempty"`

// The time the execution first requests for an allocation token
AllocationTokenRequestStartTime time.Time `json:"allocation_token_request_start_time,omitempty"`
}

// This is the main state iteration
func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient,
executionsCache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) {
executionsCache cache.AutoRefresh, cfg *config.Config, metrics QuboleHiveExecutorMetrics) (ExecutionState, error) {

var transformError error
var newState ExecutionState

switch currentState.Phase {
case PhaseNotStarted:
newState, transformError = GetAllocationToken(ctx, tCtx)
newState, transformError = GetAllocationToken(ctx, tCtx, currentState, metrics)

case PhaseQueued:
newState, transformError = KickOffQuery(ctx, tCtx, currentState, quboleClient, executionsCache, cfg)
Expand Down Expand Up @@ -150,7 +153,7 @@ func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx c
return core.ResourceNamespace(clusterPrimaryLabel), nil
}

func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (ExecutionState, error) {
func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) {
newState := ExecutionState{}
uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()

Expand All @@ -167,6 +170,15 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (Ex
}
logger.Infof(ctx, "Allocation result for [%s] is [%s]", uniqueId, allocationStatus)

// Emitting the duration this execution has been waiting for a token allocation
if currentState.AllocationTokenRequestStartTime.IsZero() {
newState.AllocationTokenRequestStartTime = time.Now()
} else {
newState.AllocationTokenRequestStartTime = currentState.AllocationTokenRequestStartTime
}
waitTime := time.Since(newState.AllocationTokenRequestStartTime)
metric.ResourceWaitTime.Observe(waitTime.Seconds())

if allocationStatus == core.AllocationStatusGranted {
newState.Phase = PhaseQueued
} else if allocationStatus == core.AllocationStatusExhausted {
Expand Down
52 changes: 49 additions & 3 deletions flyteplugins/go/tasks/plugins/hive/execution_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ import (
"context"
"net/url"
"testing"
"time"

"github.com/lyft/flytestdlib/contextutils"
"github.com/lyft/flytestdlib/promutils/labeled"

"github.com/lyft/flytestdlib/promutils"

idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins"
Expand All @@ -21,6 +27,10 @@ import (
"github.com/lyft/flyteplugins/go/tasks/plugins/hive/config"
)

func init() {
labeled.SetMetricKeys(contextutils.NamespaceKey)
}

func TestInTerminalState(t *testing.T) {
var stateTests = []struct {
phase ExecutionPhase
Expand Down Expand Up @@ -169,7 +179,9 @@ func TestGetAllocationToken(t *testing.T) {
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

state, err := GetAllocationToken(ctx, tCtx)
mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseQueued, state.Phase)
})
Expand All @@ -181,7 +193,9 @@ func TestGetAllocationToken(t *testing.T) {
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusExhausted, nil)

state, err := GetAllocationToken(ctx, tCtx)
mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseNotStarted, state.Phase)
})
Expand All @@ -193,10 +207,42 @@ func TestGetAllocationToken(t *testing.T) {
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

state, err := GetAllocationToken(ctx, tCtx)
mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()}
mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, PhaseNotStarted, state.Phase)
})

t.Run("Request start time, if empty in current state, should be set", func(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusNamespaceQuotaExceeded, nil)

mockCurrentState := ExecutionState{}
mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, state.AllocationTokenRequestStartTime.IsZero(), false)
})

t.Run("Request start time, if already set in current state, should be maintained", func(t *testing.T) {
tCtx := GetMockTaskExecutionContext()
mockResourceManager := tCtx.ResourceManager()
x := mockResourceManager.(*mocks.ResourceManager)
x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything).
Return(core.AllocationStatusGranted, nil)

startTime := time.Now()
mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: startTime}
mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope())
state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics)
assert.NoError(t, err)
assert.Equal(t, state.AllocationTokenRequestStartTime.IsZero(), false)
assert.Equal(t, state.AllocationTokenRequestStartTime, startTime)
})
}

func TestAbort(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/hive/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionC

// Do what needs to be done, and give this function everything it needs to do its job properly
// TODO: Play around with making this return a transition directly. How will that pattern affect the multi-Qubole plugin
outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg)
outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg, q.metrics)

// Return if there was an error
if transformError != nil {
Expand Down
8 changes: 8 additions & 0 deletions flyteplugins/go/tasks/plugins/hive/executor_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@ package hive
import (
"github.com/lyft/flytestdlib/promutils"
"github.com/lyft/flytestdlib/promutils/labeled"
"github.com/prometheus/client_golang/prometheus"
)

type QuboleHiveExecutorMetrics struct {
Scope promutils.Scope
ReleaseResourceFailed labeled.Counter
AllocationGranted labeled.Counter
AllocationNotGranted labeled.Counter
ResourceWaitTime prometheus.Summary
}

var (
tokenAgeObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001, 1.0: 0.0}
)

func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetrics {
return QuboleHiveExecutorMetrics{
Scope: scope,
Expand All @@ -21,5 +27,7 @@ func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetri
"Allocation request granted", scope),
AllocationNotGranted: labeled.NewCounter("allocation_not_granted",
"Allocation request did not fail but not granted", scope),
ResourceWaitTime: scope.MustNewSummaryWithOptions("resource_wait_time", "Duration the execution has been waiting for a resource allocation token",
promutils.SummaryOptions{Objectives: tokenAgeObjectives}),
}
}

0 comments on commit b5a3ed4

Please sign in to comment.