Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Linter fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ketan Umare committed Mar 7, 2020
1 parent 741b94e commit 2eb2f8c
Show file tree
Hide file tree
Showing 36 changed files with 147 additions and 145 deletions.
2 changes: 1 addition & 1 deletion go/tasks/aws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var (
// Config section for AWS Package
type Config struct {
Region string `json:"region" pflag:",AWS Region to connect to."`
AccountID string `json:"accountId" pflag:",AWS Account Id."`
AccountID string `json:"accountId" pflag:",AWS Account Identifier."`
Retries int `json:"retries" pflag:",Number of retries."`
MaxErrorStringLength int `json:"maxErrorLength" pflag:",Maximum size of error messages."`
CatalogCacheTimeout config.Duration `json:"catalog-timeout" pflag:"\"5s\",Timeout duration for checking catalog for all batch tasks"`
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/aws/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestLoadConfig(t *testing.T) {

assert.Equal(t, []v1.Toleration{tolGPU}, k8sConfig.ResourceTolerations[v1.ResourceName("nvidia.com/gpu")])
assert.Equal(t, []v1.Toleration{tolStorage}, k8sConfig.ResourceTolerations[v1.ResourceStorage])
assert.Equal(t, "1000m", k8sConfig.DefaultCpuRequest)
assert.Equal(t, "1000m", k8sConfig.DefaultCPURequest)
assert.Equal(t, "1024Mi", k8sConfig.DefaultMemoryRequest)
})

Expand Down
2 changes: 1 addition & 1 deletion go/tasks/logs/logging_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
logUtils "github.com/lyft/flyteidl/clients/go/coreutils/logs"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
)

func GetLogsForContainerInPod(ctx context.Context, pod *v1.Pod, index uint32, nameSuffix string) ([]*core.TaskLog, error) {
Expand Down
3 changes: 1 addition & 2 deletions go/tasks/logs/logging_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ func TestGetLogsForContainerInPod_MissingStatus(t *testing.T) {
},
},
},
Status: v1.PodStatus{
},
Status: v1.PodStatus{},
}
pod.Name = podName

Expand Down
3 changes: 1 addition & 2 deletions go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package core

import (
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
Expand Down Expand Up @@ -33,4 +33,3 @@ type TaskExecutionMetadata interface {
GetAnnotations() map[string]string
GetK8sServiceAccount() string
}

2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (p PhaseInfo) String() string {
return fmt.Sprintf("Phase<%s:%d %s Reason:%s>", p.phase, p.version, p.info, p.reason)
}

// Undefined entity, associated with an error
// PhaseInfoUndefined should be used when the Phase is unknown usually associated with an error
var PhaseInfoUndefined = PhaseInfo{phase: PhaseUndefined}

func phaseInfo(p Phase, v uint32, err *core.ExecutionError, info *TaskInfo) PhaseInfo {
Expand Down
4 changes: 1 addition & 3 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ type ResourceManager interface {
ReleaseResource(ctx context.Context, namespace ResourceNamespace, allocationToken string) error
}


type ResourceConstraint struct {
Value int64
}
Expand All @@ -95,7 +94,6 @@ type ResourceConstraint struct {
// For example, a ResourceConstraintsSpec with nil ProjectScopeResourceConstraint and a non-nil NamespaceScopeResourceConstraint means
// that it only poses a cap at the namespace level. A zero-value ResourceConstraintsSpec means there's no constraints posed at any level.
type ResourceConstraintsSpec struct {
ProjectScopeResourceConstraint *ResourceConstraint
ProjectScopeResourceConstraint *ResourceConstraint
NamespaceScopeResourceConstraint *ResourceConstraint
}

2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/core/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t Transition) String() string {
return fmt.Sprintf("%s,%s", t.ttype, t.info)
}

// Unknown/Undefined transition. To be returned when an error is observed
// UnknownTransition is synonymous to UndefinedTransition. To be returned when an error is observed
var UnknownTransition = Transition{TransitionTypeEphemeral, PhaseInfoUndefined}

// Creates and returns a new Transition based on the PhaseInfo.Phase
Expand Down
12 changes: 6 additions & 6 deletions go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
//go:generate pflags K8sPluginConfig

const k8sPluginConfigSectionKey = "k8s"
const defaultCpuRequest = "1000m"
const defaultCPURequest = "1000m"
const defaultMemoryRequest = "1024Mi"

var (
Expand All @@ -19,8 +19,8 @@ var (
},
}

// Top level k8s plugin config section. If you are a plugin developer writing a k8s plugin,
// register your config section as a subsection to this.
// K8sPluginConfigSection provides a singular top level config section for all plugins.
// If you are a plugin developer writing a k8s plugin, register your config section as a subsection to this.
K8sPluginConfigSection = config.MustRegisterSubSection(k8sPluginConfigSectionKey, &defaultK8sConfig)
)

Expand All @@ -40,7 +40,7 @@ type K8sPluginConfig struct {
// Currently we support simple resource based tolerations only
ResourceTolerations map[v1.ResourceName][]v1.Toleration `json:"resource-tolerations" pflag:"-,Default tolerations to be applied for resource of type 'key'"`
// default cpu requests for a container
DefaultCpuRequest string `json:"default-cpus" pflag:",Defines a default value for cpu for containers if not specified."`
DefaultCPURequest string `json:"default-cpus" pflag:",Defines a default value for cpu for containers if not specified."`
// default memory requests for a container
DefaultMemoryRequest string `json:"default-memory" pflag:",Defines a default value for memory for containers if not specified."`
}
Expand All @@ -51,8 +51,8 @@ func GetK8sPluginConfig() *K8sPluginConfig {
if pluginsConfig.DefaultMemoryRequest == "" {
pluginsConfig.DefaultMemoryRequest = defaultMemoryRequest
}
if pluginsConfig.DefaultCpuRequest == "" {
pluginsConfig.DefaultCpuRequest = defaultCpuRequest
if pluginsConfig.DefaultCPURequest == "" {
pluginsConfig.DefaultCPURequest = defaultCPURequest
}
return pluginsConfig
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func ApplyResourceOverrides(ctx context.Context, resources v1.ResourceRequiremen
if _, limitSet := resources.Limits[v1.ResourceCPU]; limitSet {
resources.Requests[v1.ResourceCPU] = resources.Limits[v1.ResourceCPU]
} else {
resources.Requests[v1.ResourceCPU] = resource.MustParse(config.GetK8sPluginConfig().DefaultCpuRequest)
resources.Requests[v1.ResourceCPU] = resource.MustParse(config.GetK8sPluginConfig().DefaultCPURequest)
}
}

Expand Down
26 changes: 13 additions & 13 deletions go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {
}

// Execution level env variables.
nodeExecutionId := id.GetID().NodeExecutionId.ExecutionId
nodeExecutionID := id.GetID().NodeExecutionId.ExecutionId
envVars := []v1.EnvVar{
{
Name: "FLYTE_INTERNAL_EXECUTION_ID",
Value: nodeExecutionId.Name,
Value: nodeExecutionID.Name,
},
{
Name: "FLYTE_INTERNAL_EXECUTION_PROJECT",
Value: nodeExecutionId.Project,
Value: nodeExecutionID.Project,
},
{
Name: "FLYTE_INTERNAL_EXECUTION_DOMAIN",
Value: nodeExecutionId.Domain,
Value: nodeExecutionID.Domain,
},
// TODO: Fill in these
// {
Expand All @@ -67,42 +67,42 @@ func GetExecutionEnvVars(id pluginsCore.TaskExecutionID) []v1.EnvVar {

// Task definition Level env variables.
if id.GetID().TaskId != nil {
taskId := id.GetID().TaskId
taskID := id.GetID().TaskId

envVars = append(envVars,
v1.EnvVar{
Name: "FLYTE_INTERNAL_TASK_PROJECT",
Value: taskId.Project,
Value: taskID.Project,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_TASK_DOMAIN",
Value: taskId.Domain,
Value: taskID.Domain,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_TASK_NAME",
Value: taskId.Name,
Value: taskID.Name,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_TASK_VERSION",
Value: taskId.Version,
Value: taskID.Version,
},
// Historic Task Definition Level env variables.
// Remove these once SDK is migrated to use the new ones.
v1.EnvVar{
Name: "FLYTE_INTERNAL_PROJECT",
Value: taskId.Project,
Value: taskID.Project,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_DOMAIN",
Value: taskId.Domain,
Value: taskID.Domain,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_NAME",
Value: taskId.Name,
Value: taskID.Name,
},
v1.EnvVar{
Name: "FLYTE_INTERNAL_VERSION",
Value: taskId.Version,
Value: taskID.Version,
})

}
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,9 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo
return pluginsCore.PhaseInfoSuccess(&info), nil
}

func ConvertPodFailureToError(status v1.PodStatus) (code, message string) {
code = "UnknownError"
message = "Container/Pod failed. No message received from kubernetes."
func ConvertPodFailureToError(status v1.PodStatus) (string, string) {
code := "UnknownError"
message := "Container/Pod failed. No message received from kubernetes."
if len(status.Reason) > 0 {
code = status.Reason
}
Expand Down Expand Up @@ -210,7 +210,7 @@ func ConvertPodFailureToError(status v1.PodStatus) (code, message string) {
containerState.Terminated.Message)
}
}
return
return code, message
}

func GetLastTransitionOccurredAt(pod *v1.Pod) v12.Time {
Expand Down
3 changes: 2 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package flytek8s

import (
"context"
"testing"

"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/mock"
"testing"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down
2 changes: 0 additions & 2 deletions go/tasks/pluginmachinery/flytek8s/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,3 @@ func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {
}
return envVars
}


1 change: 0 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/utils_test.go
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
package flytek8s

9 changes: 8 additions & 1 deletion go/tasks/pluginmachinery/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type taskPluginRegistry struct {
// A singleton variable that maintains a registry of all plugins. The framework uses this to access all plugins
var pluginRegistry = &taskPluginRegistry{}

func PluginRegistry() *taskPluginRegistry {
func PluginRegistry() TaskPluginRegistry {
return pluginRegistry
}

Expand Down Expand Up @@ -76,3 +76,10 @@ func (p *taskPluginRegistry) GetK8sPlugins() []k8s.PluginEntry {
defer p.m.Unlock()
return append(p.k8sPlugin[:0:0], p.k8sPlugin...)
}

type TaskPluginRegistry interface {
RegisterK8sPlugin(info k8s.PluginEntry)
RegisterCorePlugin(info core.PluginEntry)
GetCorePlugins() []core.PluginEntry
GetK8sPlugins() []k8s.PluginEntry
}
2 changes: 2 additions & 0 deletions go/tasks/pluginmachinery/utils/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/golang/protobuf/ptypes"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"github.com/lyft/flytestdlib/logger"
"github.com/pkg/errors"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
Expand Down Expand Up @@ -140,6 +141,7 @@ func serializeLiteral(ctx context.Context, l *core.Literal) (string, error) {
case *core.Literal_Scalar:
return serializeLiteralScalar(o.Scalar)
default:
logger.Debugf(ctx, "received unexpected primitive type")
return "", fmt.Errorf("received an unexpected primitive type [%v]", reflect.TypeOf(l.Value))
}
}
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/workqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func Test_workItemCache_Add(t *testing.T) {

func Test_queue_Queue(t *testing.T) {
t.Run("Err when not started", func(t *testing.T) {
q, err := NewIndexedWorkQueue("test1", newSingleStatusProcessor("hello", WorkStatusSucceeded), Config{Workers: 1, MaxRetries: 0, IndexCacheMaxItems: 1}, promutils.NewTestScope())
q, err := NewIndexedWorkQueue("test1", newSingleStatusProcessor("hello", WorkStatusFailed), Config{Workers: 1, MaxRetries: 0, IndexCacheMaxItems: 1}, promutils.NewTestScope())
assert.NoError(t, err)
assert.Error(t, q.Queue(context.TODO(), "abc", "abc"))
})
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/plugins/array/awsbatch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ type client struct {
getRateLimiter utils.RateLimiter
defaultRateLimiter utils.RateLimiter
region string
accountId string
accountID string
}

func (b client) GetRegion() string {
return b.region
}

func (b client) GetAccountID() string {
return b.accountId
return b.accountID
}

// Registers a new job definition. There is no deduping on AWS side (even for the same name).
Expand Down Expand Up @@ -164,12 +164,12 @@ func NewBatchClient(awsClient aws.Client,
getRateLimiter, defaultRateLimiter)
}

func NewCustomBatchClient(batchClient BatchServiceClient, accountId, region string,
func NewCustomBatchClient(batchClient BatchServiceClient, accountID, region string,
getRateLimiter utils.RateLimiter,
defaultRateLimiter utils.RateLimiter) Client {
return &client{
Batch: batchClient,
accountId: accountId,
accountID: accountID,
region: region,
getRateLimiter: getRateLimiter,
defaultRateLimiter: defaultRateLimiter,
Expand Down
9 changes: 3 additions & 6 deletions go/tasks/plugins/array/awsbatch/jobs_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func batchJobsForSync(_ context.Context, batchChunkSize int) cache.CreateBatches
}
}

func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updated bool, err error) {
func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updated bool) {
msg := make([]string, 0, 2)
if source.Status == nil {
logger.Warnf(ctx, "No status received for job [%v]", *source.JobId)
Expand Down Expand Up @@ -179,7 +179,7 @@ func updateJob(ctx context.Context, source *batch.JobDetail, target *Job) (updat
msg = append(msg, lastStatusReason)

target.Status.Message = strings.Join(msg, " - ")
return updated, nil
return updated
}

func minInt(a, b int) int {
Expand Down Expand Up @@ -258,10 +258,7 @@ func syncBatches(_ context.Context, client Client, handler EventHandler, batchCh
continue
}

changed, err := updateJob(ctx, jobDetail, job)
if err != nil {
return nil, err
}
changed := updateJob(ctx, jobDetail, job)

if changed {
handler.Updated(ctx, Event{
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/array/awsbatch/jobs_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func BenchmarkStore_Get(b *testing.B) {
s := newJobsStoreWithSize(b, nil, b.N)
assert.NotNil(b, s)
createName := func(i int) string {
return fmt.Sprintf("Id%v", i)
return fmt.Sprintf("Identifier%v", i)
}

for i := 0; i < n; i++ {
Expand Down
Loading

0 comments on commit 2eb2f8c

Please sign in to comment.