diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go index cb35ced99f..6bbd93c1db 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor.go @@ -34,6 +34,7 @@ type Executor struct { outputAssembler array.OutputAssembler errorAssembler array.OutputAssembler + metrics ExecutorMetrics } func (e Executor) GetID() string { @@ -74,12 +75,12 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c fallthrough case arrayCore.PhaseLaunch: - pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState) + pluginState, err = LaunchSubTasks(ctx, tCtx, e.jobStore, pluginConfig, pluginState, e.metrics) case arrayCore.PhaseCheckingSubTaskExecutions: pluginState, err = CheckSubTasksState(ctx, tCtx.TaskExecutionMetadata(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), - e.jobStore, tCtx.DataStore(), pluginConfig, pluginState) + e.jobStore, tCtx.DataStore(), pluginConfig, pluginState, e.metrics) case arrayCore.PhaseAssembleFinalOutput: pluginState.State, err = array.AssembleFinalOutputs(ctx, e.outputAssembler, tCtx, arrayCore.PhaseSuccess, pluginState.State) @@ -120,11 +121,11 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c } func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error { - return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Aborted") + return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Aborted", e.metrics) } func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error { - return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Finalized") + return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Finalized", e.metrics) } func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Config, @@ -164,6 +165,7 @@ func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Con jobDefinitionCache: definition.NewCache(cfg.JobDefCacheSize), outputAssembler: outputAssembler, errorAssembler: errorAssembler, + metrics: getAwsBatchExecutorMetrics(scope.NewSubScope("awsbatch")), }, nil } diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go b/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go new file mode 100644 index 0000000000..7966f13df0 --- /dev/null +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/executor_metrics.go @@ -0,0 +1,28 @@ +package awsbatch + +import ( + "github.com/lyft/flytestdlib/promutils" + "github.com/lyft/flytestdlib/promutils/labeled" +) + +type ExecutorMetrics struct { + Scope promutils.Scope + SubTasksSubmitted labeled.Counter + SubTasksSucceeded labeled.Counter + SubTasksFailed labeled.Counter + BatchJobTerminated labeled.Counter +} + +func getAwsBatchExecutorMetrics(scope promutils.Scope) ExecutorMetrics { + return ExecutorMetrics{ + Scope: scope, + SubTasksSubmitted: labeled.NewCounter("sub_task_submitted", + "Sub tasks submitted", scope), + SubTasksSucceeded: labeled.NewCounter("batch_task_success", + "Batch tasks successful", scope), + SubTasksFailed: labeled.NewCounter("batch_task_failure", + "Batch tasks failure", scope), + BatchJobTerminated: labeled.NewCounter("batch_job_terminated", + "Batch job terminated", scope), + } +} diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/launcher.go b/flyteplugins/go/tasks/plugins/array/awsbatch/launcher.go index 63f09d1a49..0256357ed1 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/launcher.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/launcher.go @@ -16,8 +16,9 @@ import ( ) func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, pluginConfig *config.Config, - currentState *State) (nextState *State, err error) { - + currentState *State, metrics ExecutorMetrics) (nextState *State, err error) { + logger.Debugf(ctx, "Entering LaunchSubTasks ") + size := currentState.GetExecutionArraySize() if int64(currentState.GetExecutionArraySize()) > pluginConfig.MaxArrayJobSize { ee := fmt.Errorf("array size > max allowed. Requested [%v]. Allowed [%v]", currentState.GetExecutionArraySize(), pluginConfig.MaxArrayJobSize) logger.Info(ctx, ee) @@ -35,7 +36,6 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl return nil, err } - size := currentState.GetExecutionArraySize() t, err := tCtx.TaskReader().Read(ctx) if err != nil { return nil, err @@ -53,6 +53,9 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl return nil, err } + metrics.SubTasksSubmitted.Add(ctx, float64(size)) + logger.Debugf(ctx, "BatchTasks submitted") + parentState := currentState. SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0). SetArrayStatus(arraystatus.ArrayStatus{ @@ -71,7 +74,7 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl // Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe // to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however. -func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string) error { +func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string, metrics ExecutorMetrics) error { pluginState := &State{} if _, err := tCtx.PluginStateReader().Get(pluginState); err != nil { return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state") @@ -89,7 +92,11 @@ func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batc if pluginState.GetExternalJobID() != nil { jobID := *pluginState.GetExternalJobID() logger.Infof(ctx, "Cancelling AWS Job [%v] because [%v].", jobID, reason) - return batchClient.TerminateJob(ctx, jobID, reason) + err := batchClient.TerminateJob(ctx, jobID, reason) + if err != nil { + return err + } + metrics.BatchJobTerminated.Inc(ctx) } return nil diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/launcher_test.go b/flyteplugins/go/tasks/plugins/array/awsbatch/launcher_test.go index f45c1c3f5c..b35fa24d0c 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/launcher_test.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/launcher_test.go @@ -3,6 +3,8 @@ package awsbatch import ( "testing" + "github.com/lyft/flytestdlib/promutils" + "github.com/stretchr/testify/mock" "k8s.io/apimachinery/pkg/api/resource" @@ -122,7 +124,7 @@ func TestLaunchSubTasks(t *testing.T) { JobDefinitionArn: "arn", } - newState, err := LaunchSubTasks(context.TODO(), tCtx, batchClient, &config.Config{MaxArrayJobSize: 10}, currentState) + newState, err := LaunchSubTasks(context.TODO(), tCtx, batchClient, &config.Config{MaxArrayJobSize: 10}, currentState, getAwsBatchExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) assertEqual(t, expectedState, newState) }) @@ -143,7 +145,7 @@ func TestTerminateSubTasks(t *testing.T) { batchClient.OnTerminateJob(ctx, "abc-123", "Test terminate").Return(nil).Once() t.Run("Simple", func(t *testing.T) { - assert.NoError(t, TerminateSubTasks(ctx, tCtx, batchClient, "Test terminate")) + assert.NoError(t, TerminateSubTasks(ctx, tCtx, batchClient, "Test terminate", getAwsBatchExecutorMetrics(promutils.NewTestScope()))) }) batchClient.AssertExpectations(t) diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go index c47bad8998..42eb1f8aa0 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor.go @@ -35,11 +35,10 @@ func createSubJobList(count int) []*Job { } func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata, outputPrefix, baseOutputSandbox storage.DataReference, jobStore *JobStore, - dataStore *storage.DataStore, cfg *config.Config, currentState *State) (newState *State, err error) { - + dataStore *storage.DataStore, cfg *config.Config, currentState *State, metrics ExecutorMetrics) (newState *State, err error) { + logger.Debugf(ctx, "Entering CheckSubTasksState ") newState = currentState parentState := currentState.State - jobName := taskMeta.GetTaskExecutionID().GetGeneratedName() job := jobStore.Get(jobName) // If job isn't currently being monitored (recovering from a restart?), add it to the sync-cache and return @@ -115,11 +114,16 @@ func CheckSubTasksState(ctx context.Context, taskMeta core.TaskExecutionMetadata parentState = parentState.SetArrayStatus(newArrayStatus) // Based on the summary produced above, deduce the overall phase of the task. phase := arrayCore.SummaryToPhase(ctx, currentState.GetOriginalMinSuccesses()-currentState.GetOriginalArraySize()+int64(currentState.GetExecutionArraySize()), newArrayStatus.Summary) + + if phase != arrayCore.PhaseCheckingSubTaskExecutions { + metrics.SubTasksSucceeded.Add(ctx, float64(newArrayStatus.Summary[core.PhaseSuccess])) + totalFailed := newArrayStatus.Summary[core.PhasePermanentFailure] + newArrayStatus.Summary[core.PhaseRetryableFailure] + metrics.SubTasksFailed.Add(ctx, float64(totalFailed)) + } if phase == arrayCore.PhaseWriteToDiscoveryThenFail { errorMsg := msg.Summary(cfg.MaxErrorStringLength) parentState = parentState.SetReason(errorMsg) } - if phase == arrayCore.PhaseCheckingSubTaskExecutions { newPhaseVersion := uint32(0) // For now, the only changes to PhaseVersion and PreviousSummary occur for running array jobs. diff --git a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go index a5ec131195..b3b9e5ba04 100644 --- a/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go +++ b/flyteplugins/go/tasks/plugins/array/awsbatch/monitor_test.go @@ -61,7 +61,7 @@ func TestCheckSubTasksState(t *testing.T) { }, ExternalJobID: refStr("job-id"), JobDefinitionArn: "", - }) + }, getAwsBatchExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) p, _ := newState.GetPhase() @@ -107,7 +107,7 @@ func TestCheckSubTasksState(t *testing.T) { }, ExternalJobID: refStr("job-id"), JobDefinitionArn: "", - }) + }, getAwsBatchExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) p, _ := newState.GetPhase() @@ -150,7 +150,7 @@ func TestCheckSubTasksState(t *testing.T) { }, ExternalJobID: refStr("job-id"), JobDefinitionArn: "", - }) + }, getAwsBatchExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) p, _ := newState.GetPhase() diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state.go b/flyteplugins/go/tasks/plugins/hive/execution_state.go index 5f4d1e3c2a..f4f511bfbb 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state.go @@ -203,10 +203,13 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur metric.ResourceWaitTime.Observe(waitTime.Seconds()) if allocationStatus == core.AllocationStatusGranted { + metric.AllocationGranted.Inc(ctx) newState.Phase = PhaseQueued } else if allocationStatus == core.AllocationStatusExhausted { + metric.AllocationNotGranted.Inc(ctx) newState.Phase = PhaseNotStarted } else if allocationStatus == core.AllocationStatusNamespaceQuotaExceeded { + metric.AllocationNotGranted.Inc(ctx) newState.Phase = PhaseNotStarted } else { return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]", @@ -411,7 +414,7 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe return nil } -func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error { +func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics QuboleHiveExecutorMetrics) error { // Release allocation token uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) @@ -422,9 +425,11 @@ func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionSt err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueID) if err != nil { + metrics.ResourceReleaseFailed.Inc(ctx) logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueID, err) return err } + metrics.ResourceReleased.Inc(ctx) return nil } diff --git a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go index 6b59a75d13..2acdd6d9bb 100644 --- a/flyteplugins/go/tasks/plugins/hive/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/hive/execution_state_test.go @@ -290,7 +290,7 @@ func TestFinalize(t *testing.T) { called = true }).Return(nil) - err := Finalize(ctx, tCtx, state) + err := Finalize(ctx, tCtx, state, getQuboleHiveExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) assert.True(t, called) } diff --git a/flyteplugins/go/tasks/plugins/hive/executor.go b/flyteplugins/go/tasks/plugins/hive/executor.go index 3fdf241301..556a301ab0 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor.go +++ b/flyteplugins/go/tasks/plugins/hive/executor.go @@ -93,7 +93,7 @@ func (q QuboleHiveExecutor) Finalize(ctx context.Context, tCtx core.TaskExecutio return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize") } - return Finalize(ctx, tCtx, incomingState) + return Finalize(ctx, tCtx, incomingState, q.metrics) } func (q QuboleHiveExecutor) GetProperties() core.PluginProperties { @@ -150,7 +150,7 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient return QuboleHiveExecutor{ id: quboleHiveExecutorID, cfg: cfg, - metrics: getQuboleHiveExecutorMetrics(scope), + metrics: getQuboleHiveExecutorMetrics(scope.NewSubScope("hive")), quboleClient: quboleClient, executionsCache: executionsAutoRefreshCache, }, nil diff --git a/flyteplugins/go/tasks/plugins/hive/executor_metrics.go b/flyteplugins/go/tasks/plugins/hive/executor_metrics.go index 431b18e933..519a13602a 100644 --- a/flyteplugins/go/tasks/plugins/hive/executor_metrics.go +++ b/flyteplugins/go/tasks/plugins/hive/executor_metrics.go @@ -8,7 +8,8 @@ import ( type QuboleHiveExecutorMetrics struct { Scope promutils.Scope - ReleaseResourceFailed labeled.Counter + ResourceReleased labeled.Counter + ResourceReleaseFailed labeled.Counter AllocationGranted labeled.Counter AllocationNotGranted labeled.Counter ResourceWaitTime prometheus.Summary @@ -21,12 +22,14 @@ var ( func getQuboleHiveExecutorMetrics(scope promutils.Scope) QuboleHiveExecutorMetrics { return QuboleHiveExecutorMetrics{ Scope: scope, - ReleaseResourceFailed: labeled.NewCounter("released_resource_failed", - "Error releasing allocation token", scope), - AllocationGranted: labeled.NewCounter("allocation_granted", - "Allocation request granted", scope), - AllocationNotGranted: labeled.NewCounter("allocation_not_granted", - "Allocation request did not fail but not granted", scope), + ResourceReleased: labeled.NewCounter("resource_release_success", + "Resource allocation token released", scope, labeled.EmitUnlabeledMetric), + ResourceReleaseFailed: labeled.NewCounter("resource_release_failed", + "Error releasing allocation token", scope, labeled.EmitUnlabeledMetric), + AllocationGranted: labeled.NewCounter("allocation_grant_success", + "Allocation request granted", scope, labeled.EmitUnlabeledMetric), + AllocationNotGranted: labeled.NewCounter("allocation_grant_failed", + "Allocation request did not fail but not granted", scope, labeled.EmitUnlabeledMetric), ResourceWaitTime: scope.MustNewSummaryWithOptions("resource_wait_time", "Duration the execution has been waiting for a resource allocation token", promutils.SummaryOptions{Objectives: tokenAgeObjectives}), } diff --git a/flyteplugins/go/tasks/plugins/presto/execution_state.go b/flyteplugins/go/tasks/plugins/presto/execution_state.go index e349956da3..8e917cef88 100644 --- a/flyteplugins/go/tasks/plugins/presto/execution_state.go +++ b/flyteplugins/go/tasks/plugins/presto/execution_state.go @@ -175,10 +175,13 @@ func GetAllocationToken( } if allocationStatus == core.AllocationStatusGranted { + metric.AllocationGranted.Inc(ctx) newState.CurrentPhase = PhaseQueued } else if allocationStatus == core.AllocationStatusExhausted { + metric.AllocationNotGranted.Inc(ctx) newState.CurrentPhase = PhaseNotStarted } else if allocationStatus == core.AllocationStatusNamespaceQuotaExceeded { + metric.AllocationNotGranted.Inc(ctx) newState.CurrentPhase = PhaseNotStarted } else { return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]", @@ -532,7 +535,7 @@ func Abort(ctx context.Context, currentState ExecutionState, client client.Prest return nil } -func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error { +func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState, metrics ExecutorMetrics) error { // Release allocation token uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() routingGroup, err := composeResourceNamespaceWithRoutingGroup(ctx, tCtx) @@ -543,9 +546,11 @@ func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionSt err = tCtx.ResourceManager().ReleaseResource(ctx, routingGroup, uniqueID) if err != nil { + metrics.ResourceReleaseFailed.Inc(ctx) logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueID, err) return err } + metrics.ResourceReleased.Inc(ctx) return nil } diff --git a/flyteplugins/go/tasks/plugins/presto/execution_state_test.go b/flyteplugins/go/tasks/plugins/presto/execution_state_test.go index 32a0997467..e4a2c39e35 100644 --- a/flyteplugins/go/tasks/plugins/presto/execution_state_test.go +++ b/flyteplugins/go/tasks/plugins/presto/execution_state_test.go @@ -263,7 +263,7 @@ func TestFinalize(t *testing.T) { called = true }).Return(nil) - err := Finalize(ctx, tCtx, state) + err := Finalize(ctx, tCtx, state, getPrestoExecutorMetrics(promutils.NewTestScope())) assert.NoError(t, err) assert.True(t, called) } diff --git a/flyteplugins/go/tasks/plugins/presto/executor.go b/flyteplugins/go/tasks/plugins/presto/executor.go index 745ac7817f..3d34a6b99a 100644 --- a/flyteplugins/go/tasks/plugins/presto/executor.go +++ b/flyteplugins/go/tasks/plugins/presto/executor.go @@ -85,7 +85,7 @@ func (p Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state in Finalize") } - return Finalize(ctx, tCtx, incomingState) + return Finalize(ctx, tCtx, incomingState, p.metrics) } func (p Executor) GetProperties() core.PluginProperties { @@ -125,7 +125,8 @@ func NewPrestoExecutor( cfg *config.Config, prestoClient client.PrestoClient, scope promutils.Scope) (Executor, error) { - executionsAutoRefreshCache, err := NewPrestoExecutionsCache(ctx, prestoClient, cfg, scope.NewSubScope(prestoTaskType)) + subScope := scope.NewSubScope(prestoTaskType) + executionsAutoRefreshCache, err := NewPrestoExecutionsCache(ctx, prestoClient, cfg, subScope) if err != nil { logger.Errorf(ctx, "Failed to create AutoRefreshCache in Executor Setup. Error: %v", err) return Executor{}, err @@ -139,7 +140,7 @@ func NewPrestoExecutor( return Executor{ id: prestoPluginID, cfg: cfg, - metrics: getPrestoExecutorMetrics(scope), + metrics: getPrestoExecutorMetrics(subScope), prestoClient: prestoClient, executionsCache: executionsAutoRefreshCache, }, nil diff --git a/flyteplugins/go/tasks/plugins/presto/executor_metrics.go b/flyteplugins/go/tasks/plugins/presto/executor_metrics.go index 69538a757c..92a8fb4e71 100644 --- a/flyteplugins/go/tasks/plugins/presto/executor_metrics.go +++ b/flyteplugins/go/tasks/plugins/presto/executor_metrics.go @@ -7,7 +7,8 @@ import ( type ExecutorMetrics struct { Scope promutils.Scope - ReleaseResourceFailed labeled.Counter + ResourceReleased labeled.Counter + ResourceReleaseFailed labeled.Counter AllocationGranted labeled.Counter AllocationNotGranted labeled.Counter } @@ -15,11 +16,13 @@ type ExecutorMetrics struct { func getPrestoExecutorMetrics(scope promutils.Scope) ExecutorMetrics { return ExecutorMetrics{ Scope: scope, - ReleaseResourceFailed: labeled.NewCounter("presto_released_resource_failed", - "Error releasing allocation token for Presto", scope), - AllocationGranted: labeled.NewCounter("presto_allocation_granted", - "Allocation request granted for Presto", scope), - AllocationNotGranted: labeled.NewCounter("presto_allocation_not_granted", - "Allocation request did not fail but not granted for Presto", scope), + ResourceReleased: labeled.NewCounter("resource_release_success", + "Resource allocation token released", scope, labeled.EmitUnlabeledMetric), + ResourceReleaseFailed: labeled.NewCounter("resource_release_failed", + "Error releasing allocation token", scope, labeled.EmitUnlabeledMetric), + AllocationGranted: labeled.NewCounter("allocation_grant_success", + "Allocation request granted", scope, labeled.EmitUnlabeledMetric), + AllocationNotGranted: labeled.NewCounter("allocation_grant_failed", + "Allocation request did not fail but not granted", scope, labeled.EmitUnlabeledMetric), } }