Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:lyft/flyteplugins into fairness-n…
Browse files Browse the repository at this point in the history
…amespace-cap
  • Loading branch information
bnsblue committed Feb 28, 2020
2 parents 5532621 + 28f9aeb commit ae9522c
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 35 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/golang/protobuf v1.3.3
github.com/googleapis/gnostic v0.4.1 // indirect
github.com/hashicorp/golang-lru v0.5.4
github.com/lyft/flyteidl v0.17.1
github.com/lyft/flyteidl v0.17.5
github.com/lyft/flytestdlib v0.3.2
github.com/magiconair/properties v1.8.1
github.com/mitchellh/mapstructure v1.1.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0 h1:NGL46+1RYcCXb3sShp0nQq
github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0/go.mod h1:/L5qH+AD540e7Cetbui1tuJeXdmNhO8jM6VkXeDdDhQ=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f h1:PGuAMDzAen0AulUfaEhNQMYmUpa41pAVo3zHI+GJsCM=
github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnznGEAqC3DcNm6yEj472xaFVfLM7hnYofMb12tQ=
github.com/lyft/flyteidl v0.17.1 h1:XXi8sTSzPVXG337S1ZbOTi7PHIBgy1sIehhQu1eZpyI=
github.com/lyft/flyteidl v0.17.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.17.5 h1:nuUixm2glaJ4orKw3t/G0y1iG3ikYUR6FLxQy6NPmNM=
github.com/lyft/flyteidl v0.17.5/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
github.com/lyft/flytestdlib v0.3.0/go.mod h1:LJPPJlkFj+wwVWMrQT3K5JZgNhZi2mULsCG4ZYhinhU=
github.com/lyft/flytestdlib v0.3.2 h1:bY6Y+Fg6Jdc7zY4GAYuR7t2hjWwynIdmRvtLcRNaGnw=
Expand Down
7 changes: 6 additions & 1 deletion go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
)

const DefaultPhaseVersion = uint32(0)
const SystemErrorCode = "SystemError"

//go:generate enumer -type=Phase

Expand Down Expand Up @@ -184,5 +185,9 @@ func PhaseInfoFailure(code, reason string, info *TaskInfo) PhaseInfo {
}

func PhaseInfoRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason}, info)
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_USER}, info)
}

func PhaseInfoSystemRetryableFailure(code, reason string, info *TaskInfo) PhaseInfo {
return PhaseInfoFailed(PhaseRetryableFailure, &core.ExecutionError{Code: code, Message: reason, Kind: core.ExecutionError_SYSTEM}, info)
}
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func DemystifyPending(status v1.PodStatus) (pluginsCore.PhaseInfo, error) {
// So be default if the container is not waiting with the PodInitializing/ContainerCreating
// reasons, then we will assume a failure reason, and fail instantly
t := c.LastTransitionTime.Time
return pluginsCore.PhaseInfoRetryableFailure(c.Reason, c.Message, &pluginsCore.TaskInfo{
return pluginsCore.PhaseInfoSystemRetryableFailure(c.Reason, c.Message, &pluginsCore.TaskInfo{
OccurredAt: &t,
}), nil
}
Expand Down
21 changes: 2 additions & 19 deletions go/tasks/plugins/array/awsbatch/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,11 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
}

func (e Executor) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error {
pluginState := &State{}
if _, err := tCtx.PluginStateReader().Get(pluginState); err != nil {
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state")
}

if pluginState.State == nil {
pluginState.State = &arrayCore.State{}
}

p, _ := pluginState.GetPhase()
logger.Infof(ctx, "Abort is called with phase [%v]", p)

switch p {
case arrayCore.PhaseCheckingSubTaskExecutions:
return TerminateSubTasks(ctx, e.jobStore.Client, *pluginState.GetExternalJobID())
}

return nil
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Aborted")
}

func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext) error {
return nil
return TerminateSubTasks(ctx, tCtx, e.jobStore.Client, "Finalized")
}

func NewExecutor(ctx context.Context, awsClient aws.Client, cfg *batchConfig.Config,
Expand Down
29 changes: 27 additions & 2 deletions go/tasks/plugins/array/awsbatch/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/lyft/flyteplugins/go/tasks/errors"

"github.com/lyft/flytestdlib/logger"

arrayCore "github.com/lyft/flyteplugins/go/tasks/plugins/array/core"
Expand Down Expand Up @@ -47,6 +49,7 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl

j, err := batchClient.SubmitJob(ctx, batchInput)
if err != nil {
logger.Errorf(ctx, "Failed to submit job [%+v]. Error: %v", batchInput, err)
return nil, err
}

Expand All @@ -66,6 +69,28 @@ func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchCl
return nextState, nil
}

func TerminateSubTasks(ctx context.Context, batchClient Client, jobID string) error {
return batchClient.TerminateJob(ctx, jobID, "aborted")
// Attempts to terminate the AWS Job if one is recorded in the pluginState. This API is idempotent and should be safe
// to call multiple times on the same job. It'll result in multiple calls to AWS Batch in that case, however.
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, batchClient Client, reason string) error {
pluginState := &State{}
if _, err := tCtx.PluginStateReader().Get(pluginState); err != nil {
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to unmarshal custom state")
}

// This only makes sense if the task has "just" been kicked off. Assigning state here is meant to make subsequent
// code simpler.
if pluginState.State == nil {
pluginState.State = &arrayCore.State{}
}

p, _ := pluginState.GetPhase()
logger.Infof(ctx, "TerminateSubTasks is called with phase [%v] and reason [%v]", p, reason)

if pluginState.GetExternalJobID() != nil {
jobID := *pluginState.GetExternalJobID()
logger.Infof(ctx, "Cancelling AWS Job [%v] because [%v].", jobID, reason)
return batchClient.TerminateJob(ctx, jobID, reason)
}

return nil
}
25 changes: 25 additions & 0 deletions go/tasks/plugins/array/awsbatch/launcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package awsbatch
import (
"testing"

"github.com/stretchr/testify/mock"

"k8s.io/apimachinery/pkg/api/resource"

core3 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
Expand Down Expand Up @@ -126,6 +128,29 @@ func TestLaunchSubTasks(t *testing.T) {
})
}

func TestTerminateSubTasks(t *testing.T) {
ctx := context.Background()
pStateReader := &mocks.PluginStateReader{}
pStateReader.OnGetMatch(mock.Anything).Return(0, nil).Run(func(args mock.Arguments) {
s := args.Get(0).(*State)
s.ExternalJobID = refStr("abc-123")
})

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnPluginStateReader().Return(pStateReader)

batchClient := &mocks2.Client{}
batchClient.OnTerminateJob(ctx, "abc-123", "Test terminate").Return(nil).Once()

t.Run("Simple", func(t *testing.T) {
assert.NoError(t, TerminateSubTasks(ctx, tCtx, batchClient, "Test terminate"))
})

batchClient.AssertExpectations(t)
tCtx.AssertExpectations(t)
pStateReader.AssertExpectations(t)
}

func assertEqual(t testing.TB, a, b interface{}) {
if diff := deep.Equal(a, b); diff != nil {
t.Error(diff)
Expand Down
8 changes: 5 additions & 3 deletions go/tasks/plugins/array/awsbatch/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, c
}

func toTimeout(templateTimeout *duration.Duration, defaultTimeout time.Duration) *batch.JobTimeout {
if templateTimeout != nil {
if templateTimeout != nil && templateTimeout.Seconds > 0 {
return (&batch.JobTimeout{}).SetAttemptDurationSeconds(templateTimeout.GetSeconds())
}

Expand Down Expand Up @@ -168,8 +168,10 @@ func toContainerOverrides(ctx context.Context, command []string, overrides *v1.R
envVars []v1.EnvVar) *batch.ContainerOverrides {

return &batch.ContainerOverrides{
Memory: refInt(overrides.Limits.Memory().ScaledValue(resource.Mega)),
Vcpus: refInt(overrides.Limits.Cpu().ScaledValue(resource.Mega)),
// Batch expects memory override in megabytes.
Memory: refInt(overrides.Limits.Memory().ScaledValue(resource.Mega)),
// Batch expects a rounded number of whole CPUs.
Vcpus: refInt(overrides.Limits.Cpu().Value()),
Environment: toEnvironmentVariables(ctx, envVars),
Command: refStrSlice(command),
}
Expand Down
61 changes: 56 additions & 5 deletions go/tasks/plugins/array/awsbatch/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package awsbatch

import (
"context"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/ptypes/duration"

"k8s.io/apimachinery/pkg/api/resource"

Expand Down Expand Up @@ -59,14 +63,61 @@ func TestResourceRequirementsToBatchRequirements(t *testing.T) {
}

for i, testCase := range memoryTests {
q, err := resource.ParseQuantity(testCase.Input)
if assert.NoError(t, err) {
assert.Equal(t, testCase.Expected, q.ScaledValue(resource.Mega),
"Expected != Actual for test case [%v] with Input [%v]", i, testCase.Input)
}
t.Run(fmt.Sprintf("Memory Test [%v] %v", i, testCase.Input), func(t *testing.T) {
q, err := resource.ParseQuantity(testCase.Input)
if assert.NoError(t, err) {
assert.Equal(t, testCase.Expected, q.ScaledValue(resource.Mega),
"Expected != Actual for test case [%v] with Input [%v]", i, testCase.Input)
}
})
}

cpuTests := []struct {
Input string
Expected int64
}{
// resource Quantity gets the ceiling of values to the nearest scale
{"200", 200},
{"1M", 1000000},
{"15000m", 15},
}

for i, testCase := range cpuTests {
t.Run(fmt.Sprintf("CPU Test [%v] %v", i, testCase.Input), func(t *testing.T) {
q, err := resource.ParseQuantity(testCase.Input)
if assert.NoError(t, err) {
assert.Equal(t, testCase.Expected, q.Value(),
"Expected != Actual for test case [%v] with Input [%v]", i, testCase.Input)
}
})
}
}

func TestToTimeout(t *testing.T) {
t.Run("Nil", func(t *testing.T) {
timeout := toTimeout(nil, 0*time.Second)
assert.Nil(t, timeout)
})

t.Run("TaskTemplate duration set", func(t *testing.T) {
timeout := toTimeout(&duration.Duration{Seconds: 100}, 3*24*time.Hour)
assert.NotNil(t, timeout.AttemptDurationSeconds)
assert.Equal(t, int64(100), *timeout.AttemptDurationSeconds)
})

t.Run("Default timeout used", func(t *testing.T) {
timeout := toTimeout(nil, 3*24*time.Hour)
assert.NotNil(t, timeout.AttemptDurationSeconds)
assert.Equal(t, int64((3 * 24 * time.Hour).Seconds()), *timeout.AttemptDurationSeconds)
})

t.Run("TaskTemplate duration set to 0", func(t *testing.T) {
timeout := toTimeout(&duration.Duration{Seconds: 0}, 3*24*time.Hour)
assert.NotNil(t, timeout.AttemptDurationSeconds)
assert.Equal(t, int64((3 * 24 * time.Hour).Seconds()), *timeout.AttemptDurationSeconds)
})
}

func TestArrayJobToBatchInput(t *testing.T) {
expectedBatchInput := &batch.SubmitJobInput{
ArrayProperties: &batch.ArrayProperties{
Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/array/k8s/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func CheckPodStatus(ctx context.Context, client core.KubeClient, name k8sTypes.N
return core.PhaseInfoFailed(core.PhaseRetryableFailure, &idlCore.ExecutionError{
Code: string(k8serrors.ReasonForError(err)),
Message: err.Error(),
Kind: idlCore.ExecutionError_SYSTEM,
}, &core.TaskInfo{
OccurredAt: &now,
}), nil
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func MapExecutionStateToPhaseInfo(state ExecutionState, quboleClient client.Qubo
case PhaseQueued:
// TODO: Turn into config
if state.CreationFailureCount > 5 {
phaseInfo = core.PhaseInfoRetryableFailure("QuboleFailure", "Too many creation attempts", nil)
phaseInfo = core.PhaseInfoSystemRetryableFailure("QuboleFailure", "Too many creation attempts", nil)
} else {
phaseInfo = core.PhaseInfoQueued(t, uint32(state.CreationFailureCount), "Waiting for Qubole launch")
}
Expand Down

0 comments on commit ae9522c

Please sign in to comment.