Skip to content

Commit

Permalink
Adding support for per-task PodTemplate configuration (flyteorg#308)
Browse files Browse the repository at this point in the history
* implemented

Signed-off-by: Daniel Rammer <[email protected]>

* unit tests working

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated documentation

Signed-off-by: Daniel Rammer <[email protected]>

* removing unnecessarily commented lines

Signed-off-by: Daniel Rammer <[email protected]>

* updated unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* updated docs

Signed-off-by: Daniel Rammer <[email protected]>

* if user provides PodTemplate name and it doesn't exist we should fail

Signed-off-by: Daniel Rammer <[email protected]>

* updated for new flyteidl definition

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl

Signed-off-by: Daniel Rammer <[email protected]>

* working with either container or pod passed in task template

Signed-off-by: Daniel Rammer <[email protected]>

* cleaned up definitions

Signed-off-by: Daniel Rammer <[email protected]>

* fixing up unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* removed dead code

Signed-off-by: Daniel Rammer <[email protected]>

* removing more dead code

Signed-off-by: Daniel Rammer <[email protected]>

* remove more dead code

Signed-off-by: Daniel Rammer <[email protected]>

* correctly applying resources for container and pod tasks

Signed-off-by: Daniel Rammer <[email protected]>

* updated flyteidl version

Signed-off-by: Daniel Rammer <[email protected]>

* moved sidecar parsing to pod plugin

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit tests

Signed-off-by: Daniel Rammer <[email protected]>

* updating semantics to address review comments

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Feb 6, 2023
1 parent 471a262 commit c4abd4f
Show file tree
Hide file tree
Showing 17 changed files with 745 additions and 589 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.2
github.com/flyteorg/flyteidl v1.3.5
github.com/flyteorg/flytestdlib v1.0.11
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,8 @@ github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGE
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/fatih/structtag v1.2.0/go.mod h1:mBJUNpUnHmRKrKlQQlmCrh5PuhftFbNv8Ys4/aAZl94=
github.com/flyteorg/flyteidl v1.3.2 h1:s4DC8go2ou5LtZ+CFcS31r0mhv3baelNV81C1KZS26U=
github.com/flyteorg/flyteidl v1.3.2/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flyteidl v1.3.5 h1:rSaWMndeENr0QxRKj02kp6N/qQdbgDwpFeZsZbvU45A=
github.com/flyteorg/flyteidl v1.3.5/go.mod h1:OJAq333OpInPnMhvVz93AlEjmlQ+t0FAD4aakIYE4OU=
github.com/flyteorg/flytestdlib v1.0.0/go.mod h1:QSVN5wIM1lM9d60eAEbX7NwweQXW96t5x4jbyftn89c=
github.com/flyteorg/flytestdlib v1.0.11 h1:f7B8x2/zMuimEVi4Jx0zqzvNhdi7aq7+ZWoqHsbp4F4=
github.com/flyteorg/flytestdlib v1.0.11/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
Expand Down
68 changes: 50 additions & 18 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package flytek8s
import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginscore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
"k8s.io/apimachinery/pkg/util/validation"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"k8s.io/apimachinery/pkg/util/validation"
)

const resourceGPU = "gpu"
Expand Down Expand Up @@ -193,22 +194,16 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements
return resources
}

// ToK8sContainer transforms a task template target of type core.Container into a bare-bones kubernetes container, which
// can be further modified with flyte-specific customizations specified by various static and run-time attributes.
func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) {
// Perform preliminary validations
if parameters.TaskExecMetadata.GetOverrides() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
}
if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}
// BuildRawContainer constructs a Container based on the definition passed by the taskContainer and
// TaskExecutionMetadata.
func BuildRawContainer(ctx context.Context, taskContainer *core.Container, taskExecMetadata pluginscore.TaskExecutionMetadata) (*v1.Container, error) {
// Make the container name the same as the pod name, unless it violates K8s naming conventions
// Container names are subject to the DNS-1123 standard
containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
containerName := taskExecMetadata.GetTaskExecutionID().GetGeneratedName()
if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 {
containerName = rand.String(4)
}

container := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Expand All @@ -217,12 +212,49 @@ func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *c
Env: ToK8sEnvVar(taskContainer.GetEnv()),
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
}
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil {

return container, nil
}

// ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying
// all Flyte configuration including k8s plugins and resource requests.
func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error) {
taskTemplate, err := tCtx.TaskReader().Read(ctx)
if err != nil {
logger.Warnf(ctx, "failed to read task information when trying to construct container, err: %s", err.Error())
return nil, err
}

// validate arguments
if taskTemplate.GetContainer() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to create container with no definition in TaskTemplate")
}
if tCtx.TaskExecutionMetadata().GetOverrides() == nil || tCtx.TaskExecutionMetadata().GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}

// build raw container
container, err := BuildRawContainer(ctx, taskTemplate.GetContainer(), tCtx.TaskExecutionMetadata())
if err != nil {
return nil, err
}

if container.SecurityContext == nil && config.GetK8sPluginConfig().DefaultSecurityContext != nil {
container.SecurityContext = config.GetK8sPluginConfig().DefaultSecurityContext.DeepCopy()
}

// add flyte resource customizations to the container
templateParameters := template.Parameters{
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
}

if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeAssignResources, container); err != nil {
return nil, err
}

return container, nil
}

Expand Down
63 changes: 43 additions & 20 deletions go/tasks/pluginmachinery/flytek8s/container_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,26 +335,45 @@ func TestMergeResources_PartialResourceKeys(t *testing.T) {
}

func TestToK8sContainer(t *testing.T) {
taskContainer := &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
taskTemplate := &core.TaskTemplate{
Type: "test",
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
},
},
},
},
}

taskReader := &mocks.TaskReader{}
taskReader.On("Read", mock.Anything).Return(taskTemplate, nil)

inputReader := &mocks2.InputReader{}
inputReader.OnGetInputPath().Return(storage.DataReference("test-data-reference"))
inputReader.OnGetInputPrefixPath().Return(storage.DataReference("test-data-reference-prefix"))
inputReader.OnGetMatch(mock.Anything).Return(&core.LiteralMap{}, nil)

outputWriter := &mocks2.OutputWriter{}
outputWriter.OnGetOutputPrefixPath().Return("")
outputWriter.OnGetRawOutputPrefix().Return("")
outputWriter.OnGetCheckpointPrefix().Return("/checkpoint")
outputWriter.OnGetPreviousCheckpointsPrefix().Return("/prev")

mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
mockTaskOverrides := mocks.TaskOverrides{}
mockTaskOverrides.OnGetResources().Return(&v1.ResourceRequirements{
Expand All @@ -364,12 +383,16 @@ func TestToK8sContainer(t *testing.T) {
})
mockTaskExecMetadata.OnGetOverrides().Return(&mockTaskOverrides)
mockTaskExecutionID := mocks.TaskExecutionID{}
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{})
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID)
mockTaskExecMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})

templateParameters := template.Parameters{
TaskExecMetadata: &mockTaskExecMetadata,
}
tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
tCtx.OnInputReader().Return(inputReader)
tCtx.OnTaskReader().Return(taskReader)
tCtx.OnOutputWriter().Return(outputWriter)

cfg := config.GetK8sPluginConfig()
allow := false
Expand All @@ -378,7 +401,7 @@ func TestToK8sContainer(t *testing.T) {
}
assert.NoError(t, config.SetK8sPluginConfig(cfg))

container, err := ToK8sContainer(context.TODO(), taskContainer, nil, templateParameters)
container, err := ToK8sContainer(context.TODO(), tCtx)
assert.NoError(t, err)
assert.Equal(t, container.Image, "myimage")
assert.EqualValues(t, []string{
Expand Down
Loading

0 comments on commit c4abd4f

Please sign in to comment.