Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Fix array log link phase to show interpreted phase instead (#33)
Browse files Browse the repository at this point in the history
* Use the correct phase in the log link

* Actually fix the env var override issue

* make unit tests deterministic

* PR Comments
  • Loading branch information
EngHabu authored Nov 16, 2019
1 parent 507218d commit 3a30f6d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, id pluginsCore.Ta
for k, v := range config.GetK8sPluginConfig().DefaultEnvVars {
envVars = append(envVars, v1.EnvVar{Name: k, Value: v})
}

return envVars
}

Expand Down
6 changes: 5 additions & 1 deletion go/tasks/plugins/array/awsbatch/task_links.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,17 @@ func GetTaskLinks(ctx context.Context, taskMeta pluginCore.TaskExecutionMetadata
return logLinks, nil
}

detailedArrayStatus := state.GetArrayStatus().Detailed
for childIdx, subJob := range job.SubJobs {
originalIndex := core.CalculateOriginalIndex(childIdx, state.GetIndexesToCache())
finalPhaseIdx := detailedArrayStatus.GetItem(childIdx)
finalPhase := pluginCore.Phases[finalPhaseIdx]

// The caveat here is that we will mark all attempts with the final phase we are tracking in the state.
for attemptIdx, attempt := range subJob.Attempts {
if len(attempt.LogStream) > 0 {
logLinks = append(logLinks, &idlCore.TaskLog{
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, subJob.Status.Phase),
Name: fmt.Sprintf("AWS Batch #%v-%v (%v)", originalIndex, attemptIdx, finalPhase),
Uri: fmt.Sprintf(LogStreamFormatter, jobStore.GetRegion(), attempt.LogStream),
})
}
Expand Down
8 changes: 4 additions & 4 deletions go/tasks/plugins/array/awsbatch/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func FlyteTaskToBatchInput(ctx context.Context, tCtx pluginCore.TaskExecutionCon
return nil, err
}

envVars := getEnvVarsForTask(ctx, tCtx, taskTemplate.GetContainer().GetEnv(), cfg.DefaultEnvVars)
envVars := getEnvVarsForTask(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID(), taskTemplate.GetContainer().GetEnv(), cfg.DefaultEnvVars)
resources := newContainerResourcesFromContainerTask(ctx, taskTemplate.GetContainer())
return &batch.SubmitJobInput{
JobName: refStr(tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()),
Expand Down Expand Up @@ -105,9 +105,9 @@ func UpdateBatchInputForArray(_ context.Context, batchInput *batch.SubmitJobInpu
return batchInput
}

func getEnvVarsForTask(ctx context.Context, tCtx pluginCore.TaskExecutionContext, containerEnvVars []*core.KeyValuePair,
func getEnvVarsForTask(ctx context.Context, execID pluginCore.TaskExecutionID, containerEnvVars []*core.KeyValuePair,
defaultEnvVars map[string]string) []v1.EnvVar {
envVars := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), tCtx.TaskExecutionMetadata().GetTaskExecutionID())
envVars := flytek8s.DecorateEnvVars(ctx, flytek8s.ToK8sEnvVar(containerEnvVars), execID)
m := make(map[string]string, len(envVars))
for _, envVar := range envVars {
m[envVar.Name] = envVar.Value
Expand All @@ -119,7 +119,7 @@ func getEnvVarsForTask(ctx context.Context, tCtx pluginCore.TaskExecutionContext

finalEnvVars := make([]v1.EnvVar, 0, len(m))
for key, val := range m {
finalEnvVars = append(envVars, v1.EnvVar{
finalEnvVars = append(finalEnvVars, v1.EnvVar{
Name: key,
Value: val,
})
Expand Down
26 changes: 26 additions & 0 deletions go/tasks/plugins/array/awsbatch/transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"testing"

flyteK8sConfig "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

mocks2 "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io/mocks"

"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -121,3 +123,27 @@ func TestArrayJobToBatchInput(t *testing.T) {
assert.NotNil(t, batchInput)
assert.Equal(t, *expectedBatchInput, *batchInput)
}

func Test_getEnvVarsForTask(t *testing.T) {
ctx := context.Background()
id := &mocks.TaskExecutionID{}
id.OnGetGeneratedName().Return("Job_Name")
id.OnGetID().Return(core.TaskExecutionIdentifier{})

assert.NoError(t, flyteK8sConfig.SetK8sPluginConfig(&flyteK8sConfig.K8sPluginConfig{
DefaultEnvVars: map[string]string{
"MyKey": "BadVal",
},
}))

envVars := getEnvVarsForTask(ctx, id, nil, map[string]string{
"MyKey": "MyVal",
})

assert.Equal(t, []v12.EnvVar{
{
Name: "MyKey",
Value: "MyVal",
},
}, envVars)
}

0 comments on commit 3a30f6d

Please sign in to comment.