Skip to content

Commit

Permalink
Merge k8s container and sidecar plugins (flyteorg#225)
Browse files Browse the repository at this point in the history
* initial commit

Signed-off-by: Daniel Rammer <[email protected]>

* setting container plugins primaryContainerName to '*' meaning the entire pod needs to complete

Signed-off-by: Daniel Rammer <[email protected]>

* added podBuilder interface

Signed-off-by: Daniel Rammer <[email protected]>

* building plugin with podBuilders and a defaultPodBuilder for easier testing

Signed-off-by: Daniel Rammer <[email protected]>

* unexported the podBuilder interface and it's functions

Signed-off-by: Daniel Rammer <[email protected]>

* converted sidecar tests to this pod plugin

Signed-off-by: Daniel Rammer <[email protected]>

* ported over container tests

Signed-off-by: Daniel Rammer <[email protected]>

* updated sidecar test variable names to work with container tests

Signed-off-by: Daniel Rammer <[email protected]>

* added comments for a little better documentation

Signed-off-by: Daniel Rammer <[email protected]>

* added mergeMapInto function to each annotation and label setting on sidecar

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* removed container and sidecar plugins and registered existing task types to use new pod plugin

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jan 13, 2022
1 parent 4362382 commit 5582390
Show file tree
Hide file tree
Showing 8 changed files with 465 additions and 434 deletions.
90 changes: 0 additions & 90 deletions go/tasks/plugins/k8s/container/container.go

This file was deleted.

32 changes: 32 additions & 0 deletions go/tasks/plugins/k8s/pod/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package pod

import (
"context"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"

v1 "k8s.io/api/core/v1"
)

const (
containerTaskType = "container"
)

type containerPodBuilder struct {
}

func (containerPodBuilder) buildPodSpec(ctx context.Context, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, error) {
podSpec, err := flytek8s.ToK8sPodSpec(ctx, taskCtx)
if err != nil {
return nil, err
}

return podSpec, nil
}

func (containerPodBuilder) updatePodMetadata(ctx context.Context, pod *v1.Pod, task *core.TaskTemplate, taskCtx pluginsCore.TaskExecutionContext) error {
return nil
}
54 changes: 27 additions & 27 deletions ...s/plugins/k8s/container/container_test.go → go/tasks/plugins/k8s/pod/container_test.go
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
package container
package pod

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/mock"

"k8s.io/apimachinery/pkg/types"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
pluginsCoreMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s"
pluginsIOMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

v1 "k8s.io/api/core/v1"

"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)

var resourceRequirements = &v1.ResourceRequirements{
var containerResourceRequirements = &v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
Expand Down Expand Up @@ -106,9 +106,9 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
}

func TestContainerTaskExecutor_BuildIdentityResource(t *testing.T) {
c := Plugin{}
p := plugin{defaultPodBuilder, podBuilders}
taskMetadata := &pluginsCoreMock.TaskExecutionMetadata{}
r, err := c.BuildIdentityResource(context.TODO(), taskMetadata)
r, err := p.BuildIdentityResource(context.TODO(), taskMetadata)
assert.NoError(t, err)
assert.NotNil(t, r)
_, ok := r.(*v1.Pod)
Expand All @@ -117,19 +117,19 @@ func TestContainerTaskExecutor_BuildIdentityResource(t *testing.T) {
}

func TestContainerTaskExecutor_BuildResource(t *testing.T) {
c := Plugin{}
p := plugin{defaultPodBuilder, podBuilders}
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(resourceRequirements, command, args)
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)

r, err := c.BuildResource(context.TODO(), taskCtx)
r, err := p.BuildResource(context.TODO(), taskCtx)
assert.NoError(t, err)
assert.NotNil(t, r)
j, ok := r.(*v1.Pod)
assert.True(t, ok)

assert.NotEmpty(t, j.Spec.Containers)
assert.Equal(t, resourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])
assert.Equal(t, containerResourceRequirements.Limits[v1.ResourceCPU], j.Spec.Containers[0].Resources.Limits[v1.ResourceCPU])

// TODO: Once configurable, test when setting storage is supported on the cluster vs not.
storageRes := j.Spec.Containers[0].Resources.Limits[v1.ResourceStorage]
Expand All @@ -142,29 +142,29 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) {
}

func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
c := Plugin{}
p := plugin{defaultPodBuilder, podBuilders}
j := &v1.Pod{
Status: v1.PodStatus{},
}

ctx := context.TODO()
t.Run("running", func(t *testing.T) {
j.Status.Phase = v1.PodRunning
phaseInfo, err := c.GetTaskPhase(ctx, nil, j)
phaseInfo, err := p.GetTaskPhase(ctx, nil, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
})

t.Run("queued", func(t *testing.T) {
j.Status.Phase = v1.PodPending
phaseInfo, err := c.GetTaskPhase(ctx, nil, j)
phaseInfo, err := p.GetTaskPhase(ctx, nil, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, phaseInfo.Phase())
})

t.Run("failNoCondition", func(t *testing.T) {
j.Status.Phase = v1.PodFailed
phaseInfo, err := c.GetTaskPhase(ctx, nil, j)
phaseInfo, err := p.GetTaskPhase(ctx, nil, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -180,7 +180,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
Type: v1.PodReasonUnschedulable,
},
}
phaseInfo, err := c.GetTaskPhase(ctx, nil, j)
phaseInfo, err := p.GetTaskPhase(ctx, nil, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -189,22 +189,22 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {

t.Run("success", func(t *testing.T) {
j.Status.Phase = v1.PodSucceeded
phaseInfo, err := c.GetTaskPhase(ctx, nil, j)
phaseInfo, err := p.GetTaskPhase(ctx, nil, j)
assert.NoError(t, err)
assert.NotNil(t, phaseInfo)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
})
}

func TestContainerTaskExecutor_GetProperties(t *testing.T) {
plugin := Plugin{}
p := plugin{defaultPodBuilder, podBuilders}
expected := k8s.PluginProperties{}
assert.Equal(t, expected, plugin.GetProperties())
assert.Equal(t, expected, p.GetProperties())
}

func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {
ctx := context.TODO()
c := Plugin{}
p := plugin{defaultPodBuilder, podBuilders}
reason := "InvalidImageName"
message := "Failed to apply default image tag \"TEST/flyteorg/myapp:latest\": couldn't parse image reference" +
" \"TEST/flyteorg/myapp:latest\": invalid reference format: repository name must be lowercase"
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {

t.Run("failInvalidImageName", func(t *testing.T) {
pendingPod.Status.Phase = v1.PodPending
phaseInfo, err := c.GetTaskPhase(ctx, nil, pendingPod)
phaseInfo, err := p.GetTaskPhase(ctx, nil, pendingPod)
finalReason := fmt.Sprintf("|%s", reason)
finalMessage := fmt.Sprintf("|%s", message)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit 5582390

Please sign in to comment.