Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Fix array log link phase to show interpreted phase instead #33

Merged
merged 4 commits into from
Nov 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, id pluginsCore.Ta
for k, v := range config.GetK8sPluginConfig().DefaultEnvVars {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
}

return envVars
}

Expand Down
6 changes: 5 additions & 1 deletion go/tasks/plugins/array/awsbatch/task_links.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
return logLinks, nil
}

detailedArrayStatus := state.GetArrayStatus().Detailed
for childIdx, subJob := range job.SubJobs {
originalIndex := core.CalculateOriginalIndex(childIdx, state.GetIndexesToCache())
finalPhaseIdx := detailedArrayStatus.GetItem(childIdx)
finalPhase := pluginCore.Phases[finalPhaseIdx]

// The caveat here is that we will mark all attempts with the final phase we are tracking in the state.
for attemptIdx, attempt := range subJob.Attempts {
if len(attempt.LogStream) > 0 {
logLinks = append(logLinks, &idlCore.TaskLog{
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, subJob.Status.Phase),
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, finalPhase),
Uri: fmt.Sprintf(LogStreamFormatter, jobStore.GetRegion(), attempt.LogStream),
})
}
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/plugins/array/awsbatch/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon
return nil, err
}

envVars := getEnvVarsForTask(ctx, tCtx, taskTemplate.GetContainer().GetEnv(), cfg.DefaultEnvVars)
envVars := getEnvVarsForTask(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID(), taskTemplate.GetContainer().GetEnv(), cfg.DefaultEnvVars)
resources := newContainerResourcesFromContainerTask(ctx, taskTemplate.GetContainer())
return &batch.SubmitJobInput{
JobName: refStr(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()),
Expand Down Expand Up @@ -105,9 +105,9 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu
return batchInput
}

func getEnvVarsForTask(ctx context.Context, tCtx pluginCore.TaskExecutionContext, containerEnvVars []*core.KeyValuePair,
func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, containerEnvVars []*core.KeyValuePair,
defaultEnvVars map[string]string) []v1.EnvVar {
envVars := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), tCtx.TaskExecutionMetadata().GetTaskExecutionID())
envVars := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), execID)
m := make(map[string]string, len(envVars))
for _, envVar := range envVars {
m[envVar.Name] = envVar.Value
Expand All @@ -119,7 +119,7 @@ func getEnvVarsForTask(ctx context.Context, tCtx pluginCore.TaskExecutionContext

finalEnvVars := make([]v1.EnvVar, 0, len(m))
for key, val := range m {
finalEnvVars = append(envVars, v1.EnvVar{
finalEnvVars = append(finalEnvVars, v1.EnvVar{
Name: key,
Value: val,
})
Expand Down
26 changes: 26 additions & 0 deletions go/tasks/plugins/array/awsbatch/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"testing"

flyteK8sConfig "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

mocks2 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -121,3 +123,27 @@ func TestArrayJobToBatchInput(t *testing.T) {
assert.NotNil(t, batchInput)
assert.Equal(t, *expectedBatchInput, *batchInput)
}

func Test_getEnvVarsForTask(t *testing.T) {
ctx := context.Background()
id := &mocks.TaskExecutionID{}
id.OnGetGeneratedName().Return("Job_Name")
id.OnGetID().Return(core.TaskExecutionIdentifier{})

assert.NoError(t, flyteK8sConfig.SetK8sPluginConfig(&flyteK8sConfig.K8sPluginConfig{
DefaultEnvVars: map[string]string{
"MyKey": "BadVal",
},
}))

envVars := getEnvVarsForTask(ctx, id, nil, map[string]string{
"MyKey": "MyVal",
})

assert.Equal(t, []v12.EnvVar{
{
Name: "MyKey",
Value: "MyVal",
},
}, envVars)
}