From 2eb2f8c1a3009a8175a36b94ac3ce2eb6bd51226 Mon Sep 17 00:00:00 2001 From: Ketan Umare Date: Fri, 6 Mar 2020 18:36:32 -0800 Subject: [PATCH] Linter fixes --- go/tasks/aws/config.go | 2 +- go/tasks/aws/config_flags.go | 2 +- go/tasks/config_load_test.go | 2 +- go/tasks/logs/logging_utils.go | 2 +- go/tasks/logs/logging_utils_test.go | 3 +- .../pluginmachinery/core/exec_metadata.go | 3 +- go/tasks/pluginmachinery/core/phase.go | 2 +- .../pluginmachinery/core/resource_manager.go | 4 +- go/tasks/pluginmachinery/core/transition.go | 2 +- .../pluginmachinery/flytek8s/config/config.go | 12 ++-- .../config/k8spluginconfig_flags_test.go | 2 +- .../flytek8s/container_helper.go | 2 +- .../flytek8s/k8s_resource_adds.go | 26 ++++---- .../pluginmachinery/flytek8s/pod_helper.go | 8 +-- .../flytek8s/pod_helper_test.go | 3 +- go/tasks/pluginmachinery/flytek8s/utils.go | 2 - .../pluginmachinery/flytek8s/utils_test.go | 1 - go/tasks/pluginmachinery/registry.go | 9 ++- go/tasks/pluginmachinery/utils/template.go | 2 + .../pluginmachinery/workqueue/queue_test.go | 2 +- go/tasks/plugins/array/awsbatch/client.go | 8 +-- go/tasks/plugins/array/awsbatch/jobs_store.go | 9 +-- .../plugins/array/awsbatch/jobs_store_test.go | 2 +- .../plugins/array/awsbatch/monitor_test.go | 3 +- go/tasks/plugins/array/awsbatch/task_links.go | 4 +- .../plugins/array/awsbatch/transformer.go | 2 +- go/tasks/plugins/array/catalog.go | 8 +-- go/tasks/plugins/array/k8s/monitor.go | 2 +- go/tasks/plugins/hive/client/qubole_client.go | 13 ++-- go/tasks/plugins/hive/client/qubole_status.go | 3 +- go/tasks/plugins/hive/config/config.go | 28 ++++----- go/tasks/plugins/hive/execution_state.go | 62 +++++++++---------- go/tasks/plugins/hive/execution_state_test.go | 18 +++--- go/tasks/plugins/hive/executions_cache.go | 24 +++---- .../plugins/hive/executions_cache_test.go | 8 +-- go/tasks/plugins/hive/executor.go | 7 ++- 36 files changed, 147 insertions(+), 145 deletions(-) diff --git a/go/tasks/aws/config.go b/go/tasks/aws/config.go index 4abac3845..5b69c8c76 100644 --- a/go/tasks/aws/config.go +++ b/go/tasks/aws/config.go @@ -29,7 +29,7 @@ var ( // Config section for AWS Package type Config struct { Region string `json:"region" pflag:",AWS Region to connect to."` - AccountID string `json:"accountId" pflag:",AWS Account Id."` + AccountID string `json:"accountId" pflag:",AWS Account Identifier."` Retries int `json:"retries" pflag:",Number of retries."` MaxErrorStringLength int `json:"maxErrorLength" pflag:",Maximum size of error messages."` CatalogCacheTimeout config.Duration `json:"catalog-timeout" pflag:"\"5s\",Timeout duration for checking catalog for all batch tasks"` diff --git a/go/tasks/aws/config_flags.go b/go/tasks/aws/config_flags.go index 82c3f106a..12947bc0a 100755 --- a/go/tasks/aws/config_flags.go +++ b/go/tasks/aws/config_flags.go @@ -42,7 +42,7 @@ func (Config) mustMarshalJSON(v json.Marshaler) string { func (cfg Config) GetPFlagSet(prefix string) *pflag.FlagSet { cmdFlags := pflag.NewFlagSet("Config", pflag.ExitOnError) cmdFlags.String(fmt.Sprintf("%v%v", prefix, "region"), defaultConfig.Region, "AWS Region to connect to.") - cmdFlags.String(fmt.Sprintf("%v%v", prefix, "accountId"), defaultConfig.AccountID, "AWS Account Id.") + cmdFlags.String(fmt.Sprintf("%v%v", prefix, "accountId"), defaultConfig.AccountID, "AWS Account Identifier.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "retries"), defaultConfig.Retries, "Number of retries.") cmdFlags.Int(fmt.Sprintf("%v%v", prefix, "maxErrorLength"), defaultConfig.MaxErrorStringLength, "Maximum size of error messages.") cmdFlags.String(fmt.Sprintf("%v%v", prefix, "catalog-timeout"), defaultConfig.CatalogCacheTimeout.String(), "Timeout duration for checking catalog for all batch tasks") diff --git a/go/tasks/config_load_test.go b/go/tasks/config_load_test.go index 178f4fcf3..0c1627a27 100755 --- a/go/tasks/config_load_test.go +++ b/go/tasks/config_load_test.go @@ -67,7 +67,7 @@ func TestLoadConfig(t *testing.T) { assert.Equal(t, []v1.Toleration{tolGPU}, k8sConfig.ResourceTolerations[v1.ResourceName("nvidia.com/gpu")]) assert.Equal(t, []v1.Toleration{tolStorage}, k8sConfig.ResourceTolerations[v1.ResourceStorage]) - assert.Equal(t, "1000m", k8sConfig.DefaultCpuRequest) + assert.Equal(t, "1000m", k8sConfig.DefaultCPURequest) assert.Equal(t, "1024Mi", k8sConfig.DefaultMemoryRequest) }) diff --git a/go/tasks/logs/logging_utils.go b/go/tasks/logs/logging_utils.go index f69cd49c0..8c7393c5f 100755 --- a/go/tasks/logs/logging_utils.go +++ b/go/tasks/logs/logging_utils.go @@ -6,7 +6,7 @@ import ( logUtils "github.com/lyft/flyteidl/clients/go/coreutils/logs" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flytestdlib/logger" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" ) func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) { diff --git a/go/tasks/logs/logging_utils_test.go b/go/tasks/logs/logging_utils_test.go index d6bdb4856..9bae2b300 100755 --- a/go/tasks/logs/logging_utils_test.go +++ b/go/tasks/logs/logging_utils_test.go @@ -73,8 +73,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) { }, }, }, - Status: v1.PodStatus{ - }, + Status: v1.PodStatus{}, } pod.Name = podName diff --git a/go/tasks/pluginmachinery/core/exec_metadata.go b/go/tasks/pluginmachinery/core/exec_metadata.go index 113921eeb..22a3fec14 100644 --- a/go/tasks/pluginmachinery/core/exec_metadata.go +++ b/go/tasks/pluginmachinery/core/exec_metadata.go @@ -2,7 +2,7 @@ package core import ( "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" v12 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -33,4 +33,3 @@ type TaskExecutionMetadata interface { GetAnnotations() map[string]string GetK8sServiceAccount() string } - diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 059e43925..470cf1368 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -118,7 +118,7 @@ func (p PhaseInfo) String() string { return fmt.Sprintf("Phase<%s:%d %s Reason:%s>", p.phase, p.version, p.info, p.reason) } -// Undefined entity, associated with an error +// PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error var PhaseInfoUndefined = PhaseInfo{phase: PhaseUndefined} func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) PhaseInfo { diff --git a/go/tasks/pluginmachinery/core/resource_manager.go b/go/tasks/pluginmachinery/core/resource_manager.go index 0b60c6198..74a3f53aa 100644 --- a/go/tasks/pluginmachinery/core/resource_manager.go +++ b/go/tasks/pluginmachinery/core/resource_manager.go @@ -83,7 +83,6 @@ type ResourceManager interface { ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error } - type ResourceConstraint struct { Value int64 } @@ -95,7 +94,6 @@ type ResourceConstraint struct { // For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means // that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level. type ResourceConstraintsSpec struct { - ProjectScopeResourceConstraint *ResourceConstraint + ProjectScopeResourceConstraint *ResourceConstraint NamespaceScopeResourceConstraint *ResourceConstraint } - diff --git a/go/tasks/pluginmachinery/core/transition.go b/go/tasks/pluginmachinery/core/transition.go index f05a33c01..28f06884e 100644 --- a/go/tasks/pluginmachinery/core/transition.go +++ b/go/tasks/pluginmachinery/core/transition.go @@ -39,7 +39,7 @@ func (t Transition) String() string { return fmt.Sprintf("%s,%s", t.ttype, t.info) } -// Unknown/Undefined transition. To be returned when an error is observed +// UnknownTransition is synonymous to UndefinedTransition. To be returned when an error is observed var UnknownTransition = Transition{TransitionTypeEphemeral, PhaseInfoUndefined} // Creates and returns a new Transition based on the PhaseInfo.Phase diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index 6883983eb..f703120a6 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -9,7 +9,7 @@ import ( //go:generate pflags K8sPluginConfig const k8sPluginConfigSectionKey = "k8s" -const defaultCpuRequest = "1000m" +const defaultCPURequest = "1000m" const defaultMemoryRequest = "1024Mi" var ( @@ -19,8 +19,8 @@ var ( }, } - // Top level k8s plugin config section. If you are a plugin developer writing a k8s plugin, - // register your config section as a subsection to this. + // K8sPluginConfigSection provides a singular top level config section for all plugins. + // If you are a plugin developer writing a k8s plugin, register your config section as a subsection to this. K8sPluginConfigSection = config.MustRegisterSubSection(k8sPluginConfigSectionKey, &defaultK8sConfig) ) @@ -40,7 +40,7 @@ type K8sPluginConfig struct { // Currently we support simple resource based tolerations only ResourceTolerations map[v1.ResourceName][]v1.Toleration `json:"resource-tolerations" pflag:"-,Default tolerations to be applied for resource of type 'key'"` // default cpu requests for a container - DefaultCpuRequest string `json:"default-cpus" pflag:",Defines a default value for cpu for containers if not specified."` + DefaultCPURequest string `json:"default-cpus" pflag:",Defines a default value for cpu for containers if not specified."` // default memory requests for a container DefaultMemoryRequest string `json:"default-memory" pflag:",Defines a default value for memory for containers if not specified."` } @@ -51,8 +51,8 @@ func GetK8sPluginConfig() *K8sPluginConfig { if pluginsConfig.DefaultMemoryRequest == "" { pluginsConfig.DefaultMemoryRequest = defaultMemoryRequest } - if pluginsConfig.DefaultCpuRequest == "" { - pluginsConfig.DefaultCpuRequest = defaultCpuRequest + if pluginsConfig.DefaultCPURequest == "" { + pluginsConfig.DefaultCPURequest = defaultCPURequest } return pluginsConfig } diff --git a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go index 1ecd13ef8..692c12853 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go +++ b/go/tasks/pluginmachinery/flytek8s/config/k8spluginconfig_flags_test.go @@ -136,7 +136,7 @@ func TestK8sPluginConfig_SetFlags(t *testing.T) { cmdFlags.Set("default-cpus", testValue) if vString, err := cmdFlags.GetString("default-cpus"); err == nil { - testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vString), &actual.DefaultCpuRequest) + testDecodeJson_K8sPluginConfig(t, fmt.Sprintf("%v", vString), &actual.DefaultCPURequest) } else { assert.FailNow(t, err.Error()) diff --git a/go/tasks/pluginmachinery/flytek8s/container_helper.go b/go/tasks/pluginmachinery/flytek8s/container_helper.go index f87d7e19f..d1e204b61 100755 --- a/go/tasks/pluginmachinery/flytek8s/container_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/container_helper.go @@ -40,7 +40,7 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet { resources.Requests[v1.ResourceCPU] = resources.Limits[v1.ResourceCPU] } else { - resources.Requests[v1.ResourceCPU] = resource.MustParse(config.GetK8sPluginConfig().DefaultCpuRequest) + resources.Requests[v1.ResourceCPU] = resource.MustParse(config.GetK8sPluginConfig().DefaultCPURequest) } } diff --git a/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go b/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go index 3baf5304c..8d758237e 100755 --- a/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go +++ b/go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go @@ -40,19 +40,19 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar { } // Execution level env variables. - nodeExecutionId := id.GetID().NodeExecutionId.ExecutionId + nodeExecutionID := id.GetID().NodeExecutionId.ExecutionId envVars := []v1.EnvVar{ { Name: "FLYTE_INTERNAL_EXECUTION_ID", - Value: nodeExecutionId.Name, + Value: nodeExecutionID.Name, }, { Name: "FLYTE_INTERNAL_EXECUTION_PROJECT", - Value: nodeExecutionId.Project, + Value: nodeExecutionID.Project, }, { Name: "FLYTE_INTERNAL_EXECUTION_DOMAIN", - Value: nodeExecutionId.Domain, + Value: nodeExecutionID.Domain, }, // TODO: Fill in these // { @@ -67,42 +67,42 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar { // Task definition Level env variables. if id.GetID().TaskId != nil { - taskId := id.GetID().TaskId + taskID := id.GetID().TaskId envVars = append(envVars, v1.EnvVar{ Name: "FLYTE_INTERNAL_TASK_PROJECT", - Value: taskId.Project, + Value: taskID.Project, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_TASK_DOMAIN", - Value: taskId.Domain, + Value: taskID.Domain, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_TASK_NAME", - Value: taskId.Name, + Value: taskID.Name, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_TASK_VERSION", - Value: taskId.Version, + Value: taskID.Version, }, // Historic Task Definition Level env variables. // Remove these once SDK is migrated to use the new ones. v1.EnvVar{ Name: "FLYTE_INTERNAL_PROJECT", - Value: taskId.Project, + Value: taskID.Project, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_DOMAIN", - Value: taskId.Domain, + Value: taskID.Domain, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_NAME", - Value: taskId.Name, + Value: taskID.Name, }, v1.EnvVar{ Name: "FLYTE_INTERNAL_VERSION", - Value: taskId.Version, + Value: taskID.Version, }) } diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index c9e342342..2a21ca62e 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -180,9 +180,9 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo return pluginsCore.PhaseInfoSuccess(&info), nil } -func ConvertPodFailureToError(status v1.PodStatus) (code, message string) { - code = "UnknownError" - message = "Container/Pod failed. No message received from kubernetes." +func ConvertPodFailureToError(status v1.PodStatus) (string, string) { + code := "UnknownError" + message := "Container/Pod failed. No message received from kubernetes." if len(status.Reason) > 0 { code = status.Reason } @@ -210,7 +210,7 @@ func ConvertPodFailureToError(status v1.PodStatus) (code, message string) { containerState.Terminated.Message) } } - return + return code, message } func GetLastTransitionOccurredAt(pod *v1.Pod) v12.Time { diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 5ee17eaa5..e729d22ef 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -2,9 +2,10 @@ package flytek8s import ( "context" + "testing" + "github.com/lyft/flytestdlib/storage" "github.com/stretchr/testify/mock" - "testing" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" diff --git a/go/tasks/pluginmachinery/flytek8s/utils.go b/go/tasks/pluginmachinery/flytek8s/utils.go index 0d3c0f88d..ca8658c5d 100755 --- a/go/tasks/pluginmachinery/flytek8s/utils.go +++ b/go/tasks/pluginmachinery/flytek8s/utils.go @@ -12,5 +12,3 @@ func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar { } return envVars } - - diff --git a/go/tasks/pluginmachinery/flytek8s/utils_test.go b/go/tasks/pluginmachinery/flytek8s/utils_test.go index ffab2ccbb..6ac2bcac9 100755 --- a/go/tasks/pluginmachinery/flytek8s/utils_test.go +++ b/go/tasks/pluginmachinery/flytek8s/utils_test.go @@ -1,2 +1 @@ package flytek8s - diff --git a/go/tasks/pluginmachinery/registry.go b/go/tasks/pluginmachinery/registry.go index b7d9eb1f0..e88aa2ba1 100644 --- a/go/tasks/pluginmachinery/registry.go +++ b/go/tasks/pluginmachinery/registry.go @@ -19,7 +19,7 @@ type taskPluginRegistry struct { // A singleton variable that maintains a registry of all plugins. The framework uses this to access all plugins var pluginRegistry = &taskPluginRegistry{} -func PluginRegistry() *taskPluginRegistry { +func PluginRegistry() TaskPluginRegistry { return pluginRegistry } @@ -76,3 +76,10 @@ func (p *taskPluginRegistry) GetK8sPlugins() []k8s.PluginEntry { defer p.m.Unlock() return append(p.k8sPlugin[:0:0], p.k8sPlugin...) } + +type TaskPluginRegistry interface { + RegisterK8sPlugin(info k8s.PluginEntry) + RegisterCorePlugin(info core.PluginEntry) + GetCorePlugins() []core.PluginEntry + GetK8sPlugins() []k8s.PluginEntry +} diff --git a/go/tasks/pluginmachinery/utils/template.go b/go/tasks/pluginmachinery/utils/template.go index e6f858b32..5f1a777d1 100755 --- a/go/tasks/pluginmachinery/utils/template.go +++ b/go/tasks/pluginmachinery/utils/template.go @@ -9,6 +9,7 @@ import ( "github.com/golang/protobuf/ptypes" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" + "github.com/lyft/flytestdlib/logger" "github.com/pkg/errors" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" @@ -140,6 +141,7 @@ func serializeLiteral(ctx context.Context, l *core.Literal) (string, error) { case *core.Literal_Scalar: return serializeLiteralScalar(o.Scalar) default: + logger.Debugf(ctx, "received unexpected primitive type") return "", fmt.Errorf("received an unexpected primitive type [%v]", reflect.TypeOf(l.Value)) } } diff --git a/go/tasks/pluginmachinery/workqueue/queue_test.go b/go/tasks/pluginmachinery/workqueue/queue_test.go index ca3faf125..64637a931 100644 --- a/go/tasks/pluginmachinery/workqueue/queue_test.go +++ b/go/tasks/pluginmachinery/workqueue/queue_test.go @@ -133,7 +133,7 @@ func Test_workItemCache_Add(t *testing.T) { func Test_queue_Queue(t *testing.T) { t.Run("Err when not started", func(t *testing.T) { - q, err := NewIndexedWorkQueue("test1", newSingleStatusProcessor("hello", WorkStatusSucceeded), Config{Workers: 1, MaxRetries: 0, IndexCacheMaxItems: 1}, promutils.NewTestScope()) + q, err := NewIndexedWorkQueue("test1", newSingleStatusProcessor("hello", WorkStatusFailed), Config{Workers: 1, MaxRetries: 0, IndexCacheMaxItems: 1}, promutils.NewTestScope()) assert.NoError(t, err) assert.Error(t, q.Queue(context.TODO(), "abc", "abc")) }) diff --git a/go/tasks/plugins/array/awsbatch/client.go b/go/tasks/plugins/array/awsbatch/client.go index 8d0272cb6..4ad103ba0 100644 --- a/go/tasks/plugins/array/awsbatch/client.go +++ b/go/tasks/plugins/array/awsbatch/client.go @@ -56,7 +56,7 @@ type client struct { getRateLimiter utils.RateLimiter defaultRateLimiter utils.RateLimiter region string - accountId string + accountID string } func (b client) GetRegion() string { @@ -64,7 +64,7 @@ func (b client) GetRegion() string { } func (b client) GetAccountID() string { - return b.accountId + return b.accountID } // Registers a new job definition. There is no deduping on AWS side (even for the same name). @@ -164,12 +164,12 @@ func NewBatchClient(awsClient aws.Client, getRateLimiter, defaultRateLimiter) } -func NewCustomBatchClient(batchClient BatchServiceClient, accountId, region string, +func NewCustomBatchClient(batchClient BatchServiceClient, accountID, region string, getRateLimiter utils.RateLimiter, defaultRateLimiter utils.RateLimiter) Client { return &client{ Batch: batchClient, - accountId: accountId, + accountID: accountID, region: region, getRateLimiter: getRateLimiter, defaultRateLimiter: defaultRateLimiter, diff --git a/go/tasks/plugins/array/awsbatch/jobs_store.go b/go/tasks/plugins/array/awsbatch/jobs_store.go index 6c609d0c5..aec235bb9 100644 --- a/go/tasks/plugins/array/awsbatch/jobs_store.go +++ b/go/tasks/plugins/array/awsbatch/jobs_store.go @@ -98,7 +98,7 @@ func batchJobsForSync(_ context.Context, batchChunkSize int) cache.CreateBatches } } -func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updated bool, err error) { +func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updated bool) { msg := make([]string, 0, 2) if source.Status == nil { logger.Warnf(ctx, "No status received for job [%v]", *source.JobId) @@ -179,7 +179,7 @@ func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updat msg = append(msg, lastStatusReason) target.Status.Message = strings.Join(msg, " - ") - return updated, nil + return updated } func minInt(a, b int) int { @@ -258,10 +258,7 @@ func syncBatches(_ context.Context, client Client, handler EventHandler, batchCh continue } - changed, err := updateJob(ctx, jobDetail, job) - if err != nil { - return nil, err - } + changed := updateJob(ctx, jobDetail, job) if changed { handler.Updated(ctx, Event{ diff --git a/go/tasks/plugins/array/awsbatch/jobs_store_test.go b/go/tasks/plugins/array/awsbatch/jobs_store_test.go index f02ce10bd..bcd3009e0 100644 --- a/go/tasks/plugins/array/awsbatch/jobs_store_test.go +++ b/go/tasks/plugins/array/awsbatch/jobs_store_test.go @@ -174,7 +174,7 @@ func BenchmarkStore_Get(b *testing.B) { s := newJobsStoreWithSize(b, nil, b.N) assert.NotNil(b, s) createName := func(i int) string { - return fmt.Sprintf("Id%v", i) + return fmt.Sprintf("Identifier%v", i) } for i := 0; i < n; i++ { diff --git a/go/tasks/plugins/array/awsbatch/monitor_test.go b/go/tasks/plugins/array/awsbatch/monitor_test.go index f34e578ab..8251c865d 100644 --- a/go/tasks/plugins/array/awsbatch/monitor_test.go +++ b/go/tasks/plugins/array/awsbatch/monitor_test.go @@ -1,9 +1,10 @@ package awsbatch import ( + "testing" + "github.com/lyft/flytestdlib/contextutils" "github.com/lyft/flytestdlib/promutils/labeled" - "testing" "github.com/lyft/flytestdlib/promutils" "github.com/lyft/flytestdlib/storage" diff --git a/go/tasks/plugins/array/awsbatch/task_links.go b/go/tasks/plugins/array/awsbatch/task_links.go index 84708bc23..626b49a13 100644 --- a/go/tasks/plugins/array/awsbatch/task_links.go +++ b/go/tasks/plugins/array/awsbatch/task_links.go @@ -21,7 +21,7 @@ const ( JobFormatter = "https://console.aws.amazon.com/batch/home?region=%v#/jobs/queue/arn:aws:batch:%v:%v:job-queue~2F%v/job/%v" ) -func GetJobUri(jobSize int, accountID, region, queue, jobID string) string { +func GetJobURI(jobSize int, accountID, region, queue, jobID string) string { if jobSize > 1 { return fmt.Sprintf(ArrayJobFormatter, region, jobID) } @@ -32,7 +32,7 @@ func GetJobUri(jobSize int, accountID, region, queue, jobID string) string { func GetJobTaskLog(jobSize int, accountID, region, queue, jobID string) *idlCore.TaskLog { return &idlCore.TaskLog{ Name: fmt.Sprintf("AWS Batch Job"), - Uri: GetJobUri(jobSize, accountID, region, queue, jobID), + Uri: GetJobURI(jobSize, accountID, region, queue, jobID), } } diff --git a/go/tasks/plugins/array/awsbatch/transformer.go b/go/tasks/plugins/array/awsbatch/transformer.go index 5444a7eec..b64025e36 100644 --- a/go/tasks/plugins/array/awsbatch/transformer.go +++ b/go/tasks/plugins/array/awsbatch/transformer.go @@ -240,7 +240,7 @@ func jobPhaseToPluginsPhase(jobStatus string) pluginCore.Phase { case batch.JobStatusSucceeded: return pluginCore.PhaseSuccess case batch.JobStatusFailed: - // Retryable failure vs Permanent can be overriden if the task writes an errors.pb in the output prefix. + // Retryable failure vs Permanent can be overridden if the task writes an errors.pb in the output prefix. return pluginCore.PhaseRetryableFailure } diff --git a/go/tasks/plugins/array/catalog.go b/go/tasks/plugins/array/catalog.go index 1161fa306..97e54b861 100644 --- a/go/tasks/plugins/array/catalog.go +++ b/go/tasks/plugins/array/catalog.go @@ -233,7 +233,7 @@ func WriteToCatalog(ctx context.Context, ownerSignal core.SignalAsync, catalogCl return false, nil } -func ConstructCatalogUploadRequests(keyId idlCore.Identifier, taskExecId idlCore.TaskExecutionIdentifier, +func ConstructCatalogUploadRequests(keyID idlCore.Identifier, taskExecID idlCore.TaskExecutionIdentifier, cacheVersion string, taskInterface idlCore.TypedInterface, whichTasksToCache *bitarray.BitSet, inputReaders []io.InputReader, outputReaders []io.OutputReader) ([]catalog.UploadRequest, error) { @@ -251,14 +251,14 @@ func ConstructCatalogUploadRequests(keyId idlCore.Identifier, taskExecId idlCore wi := catalog.UploadRequest{ Key: catalog.Key{ - Identifier: keyId, + Identifier: keyID, InputReader: input, CacheVersion: cacheVersion, TypedInterface: taskInterface, }, ArtifactData: outputReaders[idx], ArtifactMetadata: catalog.Metadata{ - TaskExecutionIdentifier: &taskExecId, + TaskExecutionIdentifier: &taskExecID, }, } @@ -339,7 +339,7 @@ func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskRe iface.Outputs = makeSingularTaskInterface(iface.Outputs) for idx, inputReader := range inputs { - // TODO: Check if Id or Interface are empty and return err + // TODO: Check if Identifier or Interface are empty and return err item := catalog.DownloadRequest{ Key: catalog.Key{ Identifier: *t.Id, diff --git a/go/tasks/plugins/array/k8s/monitor.go b/go/tasks/plugins/array/k8s/monitor.go index ea632f0e8..58ccd852c 100644 --- a/go/tasks/plugins/array/k8s/monitor.go +++ b/go/tasks/plugins/array/k8s/monitor.go @@ -140,7 +140,7 @@ func CheckPodStatus(ctx context.Context, client core.KubeClient, name k8sTypes.N return core.PhaseInfoFailed(core.PhaseRetryableFailure, &idlCore.ExecutionError{ Code: string(k8serrors.ReasonForError(err)), Message: err.Error(), - Kind: idlCore.ExecutionError_SYSTEM, + Kind: idlCore.ExecutionError_SYSTEM, }, &core.TaskInfo{ OccurredAt: &now, }), nil diff --git a/go/tasks/plugins/hive/client/qubole_client.go b/go/tasks/plugins/hive/client/qubole_client.go index 2549e39d0..eac96047f 100644 --- a/go/tasks/plugins/hive/client/qubole_client.go +++ b/go/tasks/plugins/hive/client/qubole_client.go @@ -21,7 +21,6 @@ import ( const ( logLinkFormat = "?command_id=%s" - tokenKeyForAth = "X-AUTH-TOKEN" acceptHeaderKey = "Accept" hiveCommandType = "HiveCommand" killStatus = "kill" @@ -77,7 +76,7 @@ type quboleClient struct { func (q *quboleClient) getHeaders(url *url.URL, accountKey string) http.Header { headers := make(http.Header) - headers.Set(tokenKeyForAth, accountKey) + headers.Set("X-AUTH-TOKEN", accountKey) headers.Set(HeaderContentType, ContentTypeJSON) headers.Set(acceptHeaderKey, ContentTypeJSON) headers.Set(hostHeaderKey, url.Host) @@ -155,12 +154,12 @@ func (q *quboleClient) executeRequest(ctx context.Context, method string, u *url } /* - Execute Hive Command on the QuboleClient Hive Cluster and return the CommandId + Execute Hive Command on the QuboleClient Hive Cluster and return the CommandID param: context.Context ctx: The default go context. param: string commandStr: the query to execute param: uint32 timeoutVal: timeout for the query to execute in seconds param: string ClusterLabel: label for cluster on which to execute the Hive Command. - return: *int64: CommandId for the command executed + return: *int64: CommandID for the command executed return: error: error in-case of a failure */ func (q *quboleClient) ExecuteHiveCommand( @@ -220,7 +219,7 @@ func (q *quboleClient) ExecuteHiveCommand( /* Terminate a QuboleClient command param: context.Context ctx: The default go context. - param: string CommandId: the CommandId to terminate. + param: string CommandID: the CommandID to terminate. return: error: error in-case of a failure */ func (q *quboleClient) KillCommand(ctx context.Context, commandID string, accountKey string) error { @@ -239,8 +238,8 @@ func (q *quboleClient) KillCommand(ctx context.Context, commandID string, accoun /* Get the status of a QuboleClient command param: context.Context ctx: The default go context. - param: string CommandId: the CommandId to fetch the status for - return: *string: commandStatus for the CommandId passed + param: string CommandID: the CommandID to fetch the status for + return: *string: commandStatus for the CommandID passed return: error: error in-case of a failure */ func (q *quboleClient) GetCommandStatus(ctx context.Context, commandID string, accountKey string) (QuboleStatus, error) { diff --git a/go/tasks/plugins/hive/client/qubole_status.go b/go/tasks/plugins/hive/client/qubole_status.go index 2cf80e5a4..fafe6dc03 100644 --- a/go/tasks/plugins/hive/client/qubole_status.go +++ b/go/tasks/plugins/hive/client/qubole_status.go @@ -2,8 +2,9 @@ package client import ( "context" - "github.com/lyft/flytestdlib/logger" "strings" + + "github.com/lyft/flytestdlib/logger" ) // This type is meant only to encapsulate the response coming from Qubole as a type, it is diff --git a/go/tasks/plugins/hive/config/config.go b/go/tasks/plugins/hive/config/config.go index 490447ac5..25228525c 100644 --- a/go/tasks/plugins/hive/config/config.go +++ b/go/tasks/plugins/hive/config/config.go @@ -26,11 +26,11 @@ func MustParse(s string) config.URL { } type ClusterConfig struct { - PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"` - Labels []string `json:"labels" pflag:",Labels of a given service cluster"` - Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` - ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the service cluster"` - NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the service cluster"` + PrimaryLabel string `json:"primaryLabel" pflag:",The primary label of a given service cluster"` + Labels []string `json:"labels" pflag:",Labels of a given service cluster"` + Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) of the service cluster"` + ProjectScopeQuotaProportionCap float64 `json:"projectScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a project in the service cluster"` + NamespaceScopeQuotaProportionCap float64 `json:"namespaceScopeQuotaProportionCap" pflag:",A floating point number between 0 and 1, specifying the maximum proportion of quotas allowed to allocate to a namespace in the service cluster"` } type DestinationClusterConfig struct { @@ -47,7 +47,7 @@ var ( TokenKey: "FLYTE_QUBOLE_CLIENT_TOKEN", LruCacheSize: 2000, Workers: 15, - ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 100, ProjectScopeQuotaProportionCap:0.7, NamespaceScopeQuotaProportionCap:0.7}}, + ClusterConfigs: []ClusterConfig{{PrimaryLabel: "default", Labels: []string{"default"}, Limit: 100, ProjectScopeQuotaProportionCap: 0.7, NamespaceScopeQuotaProportionCap: 0.7}}, DestinationClusterConfigs: []DestinationClusterConfig{}, } @@ -56,14 +56,14 @@ var ( // Qubole plugin configs type Config struct { - Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` - CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` - AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` - TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` - LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` - Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` - ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` - DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` + Endpoint config.URL `json:"endpoint" pflag:",Endpoint for qubole to use"` + CommandAPIPath config.URL `json:"commandApiPath" pflag:",API Path where commands can be launched on Qubole. Should be a valid url."` + AnalyzeLinkPath config.URL `json:"analyzeLinkPath" pflag:",URL path where queries can be visualized on qubole website. Should be a valid url."` + TokenKey string `json:"quboleTokenKey" pflag:",Name of the key where to find Qubole token in the secret manager."` + LruCacheSize int `json:"lruCacheSize" pflag:",Size of the AutoRefreshCache"` + Workers int `json:"workers" pflag:",Number of parallel workers to refresh the cache"` + ClusterConfigs []ClusterConfig `json:"clusterConfigs" pflag:"-,A list of cluster configs. Each of the configs corresponds to a service cluster"` + DestinationClusterConfigs []DestinationClusterConfig `json:"destinationClusterConfigs" pflag:"-,A list configs specifying the destination service cluster for (project, domain)"` } // Retrieves the current config value or default. diff --git a/go/tasks/plugins/hive/execution_state.go b/go/tasks/plugins/hive/execution_state.go index a6134bbf9..5f4d1e3c2 100644 --- a/go/tasks/plugins/hive/execution_state.go +++ b/go/tasks/plugins/hive/execution_state.go @@ -51,7 +51,7 @@ type ExecutionState struct { Phase ExecutionPhase // This will store the command ID from Qubole - CommandId string `json:"command_id,omitempty"` + CommandID string `json:"command_id,omitempty"` URI string `json:"uri,omitempty"` // This number keeps track of the number of failures within the sync function. Without this, what happens in @@ -124,7 +124,7 @@ func MapExecutionStateToPhaseInfo(state ExecutionState, quboleClient client.Qubo func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog { return &idlCore.TaskLog{ - Name: fmt.Sprintf("Status: %s [%s]", e.Phase, e.CommandId), + Name: fmt.Sprintf("Status: %s [%s]", e.Phase, e.CommandID), MessageFormat: idlCore.TaskLog_UNKNOWN, Uri: e.URI, } @@ -133,7 +133,7 @@ func ConstructTaskLog(e ExecutionState) *idlCore.TaskLog { func ConstructTaskInfo(e ExecutionState) *core.TaskInfo { logs := make([]*idlCore.TaskLog, 0, 1) t := time.Now() - if e.CommandId != "" { + if e.CommandID != "" { logs = append(logs, ConstructTaskLog(e)) return &core.TaskInfo{ Logs: logs, @@ -176,22 +176,22 @@ func createResourceConstraintsSpec(ctx context.Context, _ core.TaskExecutionCont func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, metric QuboleHiveExecutorMetrics) (ExecutionState, error) { newState := ExecutionState{} - uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() + uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { - return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueId) + return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when requesting allocation token %s", uniqueID) } resourceConstraintsSpec := createResourceConstraintsSpec(ctx, tCtx, clusterPrimaryLabel) - allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueId, resourceConstraintsSpec) + allocationStatus, err := tCtx.ResourceManager().AllocateResource(ctx, clusterPrimaryLabel, uniqueID, resourceConstraintsSpec) if err != nil { logger.Errorf(ctx, "Resource manager failed for TaskExecId [%s] token [%s]. error %s", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, err) - return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error requesting allocation token %s", uniqueId) + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueID, err) + return newState, errors.Wrapf(errors.ResourceManagerFailure, err, "Error requesting allocation token %s", uniqueID) } - logger.Infof(ctx, "Allocation result for [%s] is [%s]", uniqueId, allocationStatus) + logger.Infof(ctx, "Allocation result for [%s] is [%s]", uniqueID, allocationStatus) // Emitting the duration this execution has been waiting for a token allocation if currentState.AllocationTokenRequestStartTime.IsZero() { @@ -210,7 +210,7 @@ func GetAllocationToken(ctx context.Context, tCtx core.TaskExecutionContext, cur newState.Phase = PhaseNotStarted } else { return newState, errors.Errorf(errors.ResourceManagerFailure, "Got bad allocation result [%s] for token [%s]", - allocationStatus, uniqueId) + allocationStatus, uniqueID) } return newState, nil @@ -283,9 +283,9 @@ func mapLabelToPrimaryLabel(ctx context.Context, quboleCfg *config.Config, label } func mapProjectDomainToDestinationClusterLabel(ctx context.Context, tCtx core.TaskExecutionContext, quboleCfg *config.Config) (string, bool) { - tExecId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() - project := tExecId.NodeExecutionId.GetExecutionId().GetProject() - domain := tExecId.NodeExecutionId.GetExecutionId().GetDomain() + tExecID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID() + project := tExecID.NodeExecutionId.GetExecutionId().GetProject() + domain := tExecID.NodeExecutionId.GetExecutionId().GetDomain() logger.Debugf(ctx, "No clusterLabelOverride. Finding the pre-defined cluster label for (project: %v, domain: %v)", project, domain) // Using a linear search because N is small for _, m := range quboleCfg.DestinationClusterConfigs { @@ -322,7 +322,7 @@ func getClusterPrimaryLabel(ctx context.Context, tCtx core.TaskExecutionContext, func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, quboleClient client.QuboleClient, cache cache.AutoRefresh, cfg *config.Config) (ExecutionState, error) { - uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() + uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() apiKey, err := tCtx.SecretManager().Get(ctx, cfg.TokenKey) if err != nil { return currentState, errors.Wrapf(errors.RuntimeFailure, err, "Failed to read token from secrets manager") @@ -340,27 +340,27 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt if err != nil { // If we failed, we'll keep the NotStarted state currentState.CreationFailureCount = currentState.CreationFailureCount + 1 - logger.Warnf(ctx, "Error creating Qubole query for %s, failure counts %d. Error: %s", uniqueId, currentState.CreationFailureCount, err) + logger.Warnf(ctx, "Error creating Qubole query for %s, failure counts %d. Error: %s", uniqueID, currentState.CreationFailureCount, err) } else { // If we succeed, then store the command id returned from Qubole, and update our state. Also, add to the // AutoRefreshCache so we start getting updates. - commandId := strconv.FormatInt(cmdDetails.ID, 10) - logger.Infof(ctx, "Created Qubole ID [%s] for token %s", commandId, uniqueId) - currentState.CommandId = commandId + commandID := strconv.FormatInt(cmdDetails.ID, 10) + logger.Infof(ctx, "Created Qubole ID [%s] for token %s", commandID, uniqueID) + currentState.CommandID = commandID currentState.Phase = PhaseSubmitted currentState.URI = cmdDetails.URI.String() executionStateCacheItem := ExecutionStateCacheItem{ ExecutionState: currentState, - Id: uniqueId, + Identifier: uniqueID, } // The first time we put it in the cache, we know it won't have succeeded so we don't need to look at it - _, err := cache.GetOrCreate(uniqueId, executionStateCacheItem) + _, err := cache.GetOrCreate(uniqueID, executionStateCacheItem) if err != nil { // This means that our cache has fundamentally broken... return a system error logger.Errorf(ctx, "Cache failed to GetOrCreate for execution [%s] cache key [%s], owner [%s]. Error %s", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueID, tCtx.TaskExecutionMetadata().GetOwnerReference(), err) return currentState, err } @@ -372,17 +372,17 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, cache cache.AutoRefresh) ( ExecutionState, error) { - uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() + uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() executionStateCacheItem := ExecutionStateCacheItem{ ExecutionState: currentState, - Id: uniqueId, + Identifier: uniqueID, } - cachedItem, err := cache.GetOrCreate(uniqueId, executionStateCacheItem) + cachedItem, err := cache.GetOrCreate(uniqueID, executionStateCacheItem) if err != nil { // This means that our cache has fundamentally broken... return a system error logger.Errorf(ctx, "Cache is broken on execution [%s] cache key [%s], owner [%s]. Error %s", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueId, + tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID(), uniqueID, tCtx.TaskExecutionMetadata().GetOwnerReference(), err) return currentState, errors.Wrapf(errors.CacheFailed, err, "Error when GetOrCreate while monitoring") } @@ -401,8 +401,8 @@ func MonitorQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState ExecutionState, qubole client.QuboleClient, apiKey string) error { // Cancel Qubole query if non-terminal state - if !InTerminalState(currentState) && currentState.CommandId != "" { - err := qubole.KillCommand(ctx, currentState.CommandId, apiKey) + if !InTerminalState(currentState) && currentState.CommandID != "" { + err := qubole.KillCommand(ctx, currentState.CommandID, apiKey) if err != nil { logger.Errorf(ctx, "Error terminating Qubole command in Finalize [%s]", err) return err @@ -413,16 +413,16 @@ func Abort(ctx context.Context, tCtx core.TaskExecutionContext, currentState Exe func Finalize(ctx context.Context, tCtx core.TaskExecutionContext, _ ExecutionState) error { // Release allocation token - uniqueId := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() + uniqueID := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName() clusterPrimaryLabel, err := composeResourceNamespaceWithClusterPrimaryLabel(ctx, tCtx) if err != nil { - return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueId) + return errors.Wrapf(errors.ResourceManagerFailure, err, "Error getting query info when releasing allocation token %s", uniqueID) } - err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueId) + err = tCtx.ResourceManager().ReleaseResource(ctx, clusterPrimaryLabel, uniqueID) if err != nil { - logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueId, err) + logger.Errorf(ctx, "Error releasing allocation token [%s] in Finalize [%s]", uniqueID, err) return err } return nil diff --git a/go/tasks/plugins/hive/execution_state_test.go b/go/tasks/plugins/hive/execution_state_test.go index c90670abf..6b59a75d1 100644 --- a/go/tasks/plugins/hive/execution_state_test.go +++ b/go/tasks/plugins/hive/execution_state_test.go @@ -110,7 +110,7 @@ func TestConstructTaskLog(t *testing.T) { expected := "https://wellness.qubole.com/v2/analyze?command_id=123" u, err := url.Parse(expected) assert.NoError(t, err) - taskLog := ConstructTaskLog(ExecutionState{CommandId: "123", URI: u.String()}) + taskLog := ConstructTaskLog(ExecutionState{CommandID: "123", URI: u.String()}) assert.Equal(t, expected, taskLog.Uri) } @@ -124,7 +124,7 @@ func TestConstructTaskInfo(t *testing.T) { e := ExecutionState{ Phase: PhaseQuerySucceeded, - CommandId: "123", + CommandID: "123", SyncFailureCount: 0, URI: u.String(), } @@ -257,7 +257,7 @@ func TestAbort(t *testing.T) { x = true }).Return(nil) - err := Abort(ctx, GetMockTaskExecutionContext(), ExecutionState{Phase: PhaseSubmitted, CommandId: "123456"}, mockQubole, "fake-key") + err := Abort(ctx, GetMockTaskExecutionContext(), ExecutionState{Phase: PhaseSubmitted, CommandID: "123456"}, mockQubole, "fake-key") assert.NoError(t, err) assert.True(t, x) }) @@ -271,7 +271,7 @@ func TestAbort(t *testing.T) { err := Abort(ctx, GetMockTaskExecutionContext(), ExecutionState{ Phase: PhaseQuerySucceeded, - CommandId: "123456", + CommandID: "123456", }, mockQubole, "fake-key") assert.NoError(t, err) assert.False(t, x) @@ -305,7 +305,7 @@ func TestMonitorQuery(t *testing.T) { mockCache := &mocks2.AutoRefresh{} mockCache.OnGetOrCreateMatch("my_wf_exec_project:my_wf_exec_domain:my_wf_exec_name", mock.Anything).Return(ExecutionStateCacheItem{ ExecutionState: ExecutionState{Phase: PhaseQuerySucceeded}, - Id: "my_wf_exec_project:my_wf_exec_domain:my_wf_exec_name", + Identifier: "my_wf_exec_project:my_wf_exec_domain:my_wf_exec_name", }, nil).Run(func(_ mock.Arguments) { getOrCreateCalled = true }) @@ -341,7 +341,7 @@ func TestKickOffQuery(t *testing.T) { newState, err := KickOffQuery(ctx, tCtx, state, mockQubole, mockCache, config.GetQuboleConfig()) assert.NoError(t, err) assert.Equal(t, PhaseSubmitted, newState.Phase) - assert.Equal(t, "453298043", newState.CommandId) + assert.Equal(t, "453298043", newState.CommandID) assert.True(t, getOrCreateCalled) assert.True(t, quboleCalled) } @@ -396,8 +396,8 @@ func Test_mapLabelToPrimaryLabel(t *testing.T) { func createMockTaskExecutionContextWithProjectDomain(project string, domain string) *mocks.TaskExecutionContext { mockTaskExecutionContext := mocks.TaskExecutionContext{} - taskExecId := &pluginsCoreMocks.TaskExecutionID{} - taskExecId.OnGetID().Return(idlCore.TaskExecutionIdentifier{ + taskExecID := &pluginsCoreMocks.TaskExecutionID{} + taskExecID.OnGetID().Return(idlCore.TaskExecutionIdentifier{ NodeExecutionId: &idlCore.NodeExecutionIdentifier{ExecutionId: &idlCore.WorkflowExecutionIdentifier{ Project: project, Domain: domain, @@ -406,7 +406,7 @@ func createMockTaskExecutionContextWithProjectDomain(project string, domain stri }) taskMetadata := &pluginsCoreMocks.TaskExecutionMetadata{} - taskMetadata.OnGetTaskExecutionID().Return(taskExecId) + taskMetadata.OnGetTaskExecutionID().Return(taskExecID) mockTaskExecutionContext.On("TaskExecutionMetadata").Return(taskMetadata) return &mockTaskExecutionContext } diff --git a/go/tasks/plugins/hive/executions_cache.go b/go/tasks/plugins/hive/executions_cache.go index 9c54d3494..3e7347ffa 100644 --- a/go/tasks/plugins/hive/executions_cache.go +++ b/go/tasks/plugins/hive/executions_cache.go @@ -57,11 +57,11 @@ type ExecutionStateCacheItem struct { // This ID is the cache key and so will need to be unique across all objects in the cache (it will probably be // unique across all of Flyte) and needs to be deterministic. // This will also be used as the allocation token for now. - Id string `json:"id"` + Identifier string `json:"id"` } func (e ExecutionStateCacheItem) ID() string { - return e.Id + return e.Identifier } // This basically grab an updated status from the Qubole API and store it in the cache @@ -78,8 +78,8 @@ func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch c return nil, errors.Errorf(errors.CacheFailed, "Failed to cast [%v]", batch[0].GetID()) } - if executionStateCacheItem.CommandId == "" { - logger.Warnf(ctx, "Sync loop - CommandID is blank for [%s] skipping", executionStateCacheItem.Id) + if executionStateCacheItem.CommandID == "" { + logger.Warnf(ctx, "Sync loop - CommandID is blank for [%s] skipping", executionStateCacheItem.Identifier) resp = append(resp, cache.ItemSyncResponse{ ID: query.GetID(), Item: query.GetItem(), @@ -90,16 +90,16 @@ func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch c } logger.Debugf(ctx, "Sync loop - processing Hive job [%s] - cache key [%s]", - executionStateCacheItem.CommandId, executionStateCacheItem.Id) + executionStateCacheItem.CommandID, executionStateCacheItem.Identifier) - quboleApiKey, err := q.secretManager.Get(ctx, q.cfg.TokenKey) + quboleAPIKey, err := q.secretManager.Get(ctx, q.cfg.TokenKey) if err != nil { return nil, err } if InTerminalState(executionStateCacheItem.ExecutionState) { logger.Debugf(ctx, "Sync loop - Qubole id [%s] in terminal state [%s]", - executionStateCacheItem.CommandId, executionStateCacheItem.Id) + executionStateCacheItem.CommandID, executionStateCacheItem.Identifier) resp = append(resp, cache.ItemSyncResponse{ ID: query.GetID(), @@ -111,10 +111,10 @@ func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch c } // Get an updated status from Qubole - logger.Debugf(ctx, "Querying Qubole for %s - %s", executionStateCacheItem.CommandId, executionStateCacheItem.Id) - commandStatus, err := q.quboleClient.GetCommandStatus(ctx, executionStateCacheItem.CommandId, quboleApiKey) + logger.Debugf(ctx, "Querying Qubole for %s - %s", executionStateCacheItem.CommandID, executionStateCacheItem.Identifier) + commandStatus, err := q.quboleClient.GetCommandStatus(ctx, executionStateCacheItem.CommandID, quboleAPIKey) if err != nil { - logger.Errorf(ctx, "Error from Qubole command %s", executionStateCacheItem.CommandId) + logger.Errorf(ctx, "Error from Qubole command %s", executionStateCacheItem.CommandID) executionStateCacheItem.SyncFailureCount++ // Make sure we don't return nil for the first argument, because that deletes it from the cache. resp = append(resp, cache.ItemSyncResponse{ @@ -132,8 +132,8 @@ func (q *QuboleHiveExecutionsCache) SyncQuboleQuery(ctx context.Context, batch c } if newExecutionPhase > executionStateCacheItem.Phase { - logger.Infof(ctx, "Moving ExecutionPhase for %s %s from %s to %s", executionStateCacheItem.CommandId, - executionStateCacheItem.Id, executionStateCacheItem.Phase, newExecutionPhase) + logger.Infof(ctx, "Moving ExecutionPhase for %s %s from %s to %s", executionStateCacheItem.CommandID, + executionStateCacheItem.Identifier, executionStateCacheItem.Phase, newExecutionPhase) executionStateCacheItem.Phase = newExecutionPhase diff --git a/go/tasks/plugins/hive/executions_cache_test.go b/go/tasks/plugins/hive/executions_cache_test.go index b80a2265b..cc33365b3 100644 --- a/go/tasks/plugins/hive/executions_cache_test.go +++ b/go/tasks/plugins/hive/executions_cache_test.go @@ -37,7 +37,7 @@ func TestQuboleHiveExecutionsCache_SyncQuboleQuery(t *testing.T) { } cacheItem := ExecutionStateCacheItem{ ExecutionState: state, - Id: "some-id", + Identifier: "some-id", } iw := &cacheMocks.ItemWrapper{} @@ -67,15 +67,15 @@ func TestQuboleHiveExecutionsCache_SyncQuboleQuery(t *testing.T) { } state := ExecutionState{ - CommandId: "123456", + CommandID: "123456", Phase: PhaseSubmitted, } cacheItem := ExecutionStateCacheItem{ ExecutionState: state, - Id: "some-id", + Identifier: "some-id", } mockQubole.OnGetCommandStatusMatch(mock.Anything, mock.MatchedBy(func(commandId string) bool { - return commandId == state.CommandId + return commandId == state.CommandID }), mock.Anything).Return(client.QuboleStatusDone, nil) iw := &cacheMocks.ItemWrapper{} diff --git a/go/tasks/plugins/hive/executor.go b/go/tasks/plugins/hive/executor.go index 3ee576265..3fdf24130 100644 --- a/go/tasks/plugins/hive/executor.go +++ b/go/tasks/plugins/hive/executor.go @@ -2,6 +2,7 @@ package hive import ( "context" + "github.com/lyft/flytestdlib/cache" "github.com/lyft/flyteplugins/go/tasks/errors" @@ -14,7 +15,7 @@ import ( ) // This is the name of this plugin effectively. In Flyte plugin configuration, use this string to enable this plugin. -const quboleHiveExecutorId = "qubole-hive-executor" +const quboleHiveExecutorID = "qubole-hive-executor" // Version of the custom state this plugin stores. Useful for backwards compatibility if you one day need to update // the structure of the stored state @@ -147,7 +148,7 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient } return QuboleHiveExecutor{ - id: quboleHiveExecutorId, + id: quboleHiveExecutorID, cfg: cfg, metrics: getQuboleHiveExecutorMetrics(scope), quboleClient: quboleClient, @@ -158,7 +159,7 @@ func NewQuboleHiveExecutor(ctx context.Context, cfg *config.Config, quboleClient func init() { pluginMachinery.PluginRegistry().RegisterCorePlugin( core.PluginEntry{ - ID: quboleHiveExecutorId, + ID: quboleHiveExecutorID, RegisteredTaskTypes: []core.TaskType{hiveTaskType}, LoadPlugin: QuboleHiveExecutorLoader, IsDefault: false,