diff --git a/go.mod b/go.mod index 092214180f..3a14c6007b 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/json-iterator/go v1.1.9 // indirect github.com/lyft/flyteidl v0.17.0 - github.com/lyft/flytestdlib v0.3.0 + github.com/lyft/flytestdlib v0.3.2 github.com/magiconair/properties v1.8.1 github.com/mitchellh/mapstructure v1.1.2 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index f12d0cedd6..d2168fdadf 100644 --- a/go.sum +++ b/go.sum @@ -295,6 +295,8 @@ github.com/lyft/flyteidl v0.17.0 h1:Vsg38PGQAe+A1/9y6buFA5HoZO5vjAT7XNKrWokz084= github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20= github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU= github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= +github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw= +github.com/lyft/flytestdlib v0.3.2/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU= github.com/lyft/spark-on-k8s-operator v0.1.3 h1:rmke8lR2Oy8mvKXRhloKuEu7fgGuXepDxiBNiorVUFI= github.com/lyft/spark-on-k8s-operator v0.1.3/go.mod h1:hkRqdqAsdNnxT/Zst6MNMRbTAoiCZ0JRw7svRgAYb0A= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= diff --git a/go/tasks/pluginmachinery/core/exec_context.go b/go/tasks/pluginmachinery/core/exec_context.go index d060f606cd..4212320c22 100644 --- a/go/tasks/pluginmachinery/core/exec_context.go +++ b/go/tasks/pluginmachinery/core/exec_context.go @@ -5,9 +5,8 @@ import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/catalog" - "github.com/lyft/flytestdlib/storage" - "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/lyft/flytestdlib/storage" ) // An interface to access the TaskInformation diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index 5ff0f2b76c..4246d12933 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -61,18 +61,21 @@ type ExecutionState struct { // In kicking off the Qubole command, this is the number of failures CreationFailureCount int `json:"creation_failure_count,omitempty"` + + // The time the execution first requests for an allocation token + AllocationTokenRequestStartTime time.Time `json:"allocation_token_request_start_time,omitempty"` } // This is the main state iteration func HandleExecutionState(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, - executionsCache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { + executionsCache cache.AutoRefresh, cfg *config.Config, metrics QuboleHiveExecutorMetrics) (ExecutionState, error) { var transformError error var newState ExecutionState switch currentState.Phase { case PhaseNotStarted: - newState, transformError = GetAllocationToken(ctx, tCtx) + newState, transformError = GetAllocationToken(ctx, tCtx, currentState, metrics) case PhaseQueued: newState, transformError = KickOffQuery(ctx, tCtx, currentState, quboleClient, executionsCache, cfg) @@ -150,7 +153,7 @@ func composeResourceNamespaceWithClusterPrimaryLabel(ctx context.Context, tCtx c return core.ResourceNamespace(clusterPrimaryLabel), nil } -func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (ExecutionState, error) { +func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) { newState := ExecutionState{} uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() @@ -167,6 +170,15 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext) (Ex } logger.Infof(ctx, "Allocation result for [%s] is [%s]", uniqueId, allocationStatus) + // Emitting the duration this execution has been waiting for a token allocation + if currentState.AllocationTokenRequestStartTime.IsZero() { + newState.AllocationTokenRequestStartTime = time.Now() + } else { + newState.AllocationTokenRequestStartTime = currentState.AllocationTokenRequestStartTime + } + waitTime := time.Since(newState.AllocationTokenRequestStartTime) + metric.ResourceWaitTime.Observe(waitTime.Seconds()) + if allocationStatus == core.AllocationStatusGranted { newState.Phase = PhaseQueued } else if allocationStatus == core.AllocationStatusExhausted { diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index ef68153459..eadd7f9512 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -4,6 +4,12 @@ import ( "context" "net/url" "testing" + "time" + + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" + + "github.com/lyft/flytestdlib/promutils" idlCore "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/plugins" @@ -21,6 +27,10 @@ import ( "github.com/lyft/flyteplugins/go/tasks/plugins/hive/config" ) +func init() { + labeled.SetMetricKeys(contextutils.NamespaceKey) +} + func TestInTerminalState(t *testing.T) { var stateTests = []struct { phase ExecutionPhase @@ -169,7 +179,9 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusGranted, nil) - state, err := GetAllocationToken(ctx, tCtx) + mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} + mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope()) + state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics) assert.NoError(t, err) assert.Equal(t, PhaseQueued, state.Phase) }) @@ -181,7 +193,9 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusExhausted, nil) - state, err := GetAllocationToken(ctx, tCtx) + mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} + mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope()) + state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) @@ -193,10 +207,42 @@ func TestGetAllocationToken(t *testing.T) { x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). Return(core.AllocationStatusNamespaceQuotaExceeded, nil) - state, err := GetAllocationToken(ctx, tCtx) + mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: time.Now()} + mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope()) + state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics) assert.NoError(t, err) assert.Equal(t, PhaseNotStarted, state.Phase) }) + + t.Run("Request start time, if empty in current state, should be set", func(t *testing.T) { + tCtx := GetMockTaskExecutionContext() + mockResourceManager := tCtx.ResourceManager() + x := mockResourceManager.(*mocks.ResourceManager) + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + Return(core.AllocationStatusNamespaceQuotaExceeded, nil) + + mockCurrentState := ExecutionState{} + mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope()) + state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics) + assert.NoError(t, err) + assert.Equal(t, state.AllocationTokenRequestStartTime.IsZero(), false) + }) + + t.Run("Request start time, if already set in current state, should be maintained", func(t *testing.T) { + tCtx := GetMockTaskExecutionContext() + mockResourceManager := tCtx.ResourceManager() + x := mockResourceManager.(*mocks.ResourceManager) + x.On("AllocateResource", mock.Anything, mock.Anything, mock.Anything). + Return(core.AllocationStatusGranted, nil) + + startTime := time.Now() + mockCurrentState := ExecutionState{AllocationTokenRequestStartTime: startTime} + mockMetrics := getQuboleHiveExecutorMetrics(promutils.NewTestScope()) + state, err := GetAllocationToken(ctx, tCtx, mockCurrentState, mockMetrics) + assert.NoError(t, err) + assert.Equal(t, state.AllocationTokenRequestStartTime.IsZero(), false) + assert.Equal(t, state.AllocationTokenRequestStartTime, startTime) + }) } func TestAbort(t *testing.T) { diff --git a/go/tasks/plugins/hive/executor.go b/go/tasks/plugins/hive/executor.go index b696069a72..a00398326c 100644 --- a/go/tasks/plugins/hive/executor.go +++ b/go/tasks/plugins/hive/executor.go @@ -51,7 +51,7 @@ func (q QuboleHiveExecutor) Handle(ctx context.Context, tCtx core.TaskExecutionC // Do what needs to be done, and give this function everything it needs to do its job properly // TODO: Play around with making this return a transition directly. How will that pattern affect the multi-Qubole plugin - outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg) + outgoingState, transformError := HandleExecutionState(ctx, tCtx, incomingState, q.quboleClient, q.executionsCache, q.cfg, q.metrics) // Return if there was an error if transformError != nil { diff --git a/go/tasks/plugins/hive/executor_metrics.go b/go/tasks/plugins/hive/executor_metrics.go index 551d30c3aa..431b18e933 100644 --- a/go/tasks/plugins/hive/executor_metrics.go +++ b/go/tasks/plugins/hive/executor_metrics.go @@ -3,6 +3,7 @@ package hive import ( "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/prometheus/client_golang/prometheus" ) type QuboleHiveExecutorMetrics struct { @@ -10,8 +11,13 @@ type QuboleHiveExecutorMetrics struct { ReleaseResourceFailed labeled.Counter AllocationGranted labeled.Counter AllocationNotGranted labeled.Counter + ResourceWaitTime prometheus.Summary } +var ( + tokenAgeObjectives = map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001, 1.0: 0.0} +) + func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetrics { return QuboleHiveExecutorMetrics{ Scope: scope, @@ -21,5 +27,7 @@ func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetri "Allocation request granted", scope), AllocationNotGranted: labeled.NewCounter("allocation_not_granted", "Allocation request did not fail but not granted", scope), + ResourceWaitTime: scope.MustNewSummaryWithOptions("resource_wait_time", "Duration the execution has been waiting for a resource allocation token", + promutils.SummaryOptions{Objectives: tokenAgeObjectives}), } }