Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:flyteorg/flyteplugins into dbx-bug-1
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw committed Mar 16, 2023
2 parents 292de7d + f0db511 commit ad8075c
Show file tree
Hide file tree
Showing 42 changed files with 1,108 additions and 1,018 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.18'
go-version: '1.19'
- name: Run GoReleaser
uses: goreleaser/goreleaser-action@v2
with:
Expand All @@ -52,7 +52,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.18'
go-version: '1.19'
- name: Unit Tests
run: make install && make test_unit_codecov
- name: Push CodeCov
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: '1.18'
go-version: '1.19'
- name: Unit Tests
run: make install && make test_unit_codecov
- name: Push CodeCov
Expand All @@ -30,6 +30,6 @@ jobs:
- uses: actions/checkout@v1
- uses: actions/setup-go@v2
with:
go-version: '1.18'
go-version: '1.19'
- name: Go generate and diff
run: DELTA_CHECK=true make generate
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/flyteorg/flyteplugins

go 1.18
go 1.19

require (
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625
Expand All @@ -12,8 +12,8 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.2
github.com/flyteorg/flytestdlib v1.0.11
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flytestdlib v1.0.15
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
github.com/hashicorp/golang-lru v0.5.4
Expand Down
274 changes: 7 additions & 267 deletions go.sum

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions go/tasks/pluginmachinery/core/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,20 @@ type ResourceRegistrar interface {

// ResourceManager Interface
// 1. Terms and definitions
//
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
// requests to be sent to.
//
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
// manages resources by managing the tokens of the resources.
//
// 2. Description
// 2. Description
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
// request for resources in their tasks, in single quantity.
//
// 3. Usage
// 3. Usage
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
// these registration requests.
Expand All @@ -63,7 +65,7 @@ type ResourceRegistrar interface {
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
// erased from the corresponding pool.
//
// 4. Example
// 4. Example
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling
Expand Down
68 changes: 50 additions & 18 deletions go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@ package flytek8s
import (
"context"

"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
pluginscore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
"k8s.io/apimachinery/pkg/util/validation"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

"github.com/flyteorg/flytestdlib/logger"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/rand"

"github.com/flyteorg/flyteplugins/go/tasks/errors"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"k8s.io/apimachinery/pkg/util/validation"
)

const resourceGPU = "gpu"
Expand Down Expand Up @@ -193,22 +194,16 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements
return resources
}

// ToK8sContainer transforms a task template target of type core.Container into a bare-bones kubernetes container, which
// can be further modified with flyte-specific customizations specified by various static and run-time attributes.
func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) {
// Perform preliminary validations
if parameters.TaskExecMetadata.GetOverrides() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
}
if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}
// BuildRawContainer constructs a Container based on the definition passed by the taskContainer and
// TaskExecutionMetadata.
func BuildRawContainer(ctx context.Context, taskContainer *core.Container, taskExecMetadata pluginscore.TaskExecutionMetadata) (*v1.Container, error) {
// Make the container name the same as the pod name, unless it violates K8s naming conventions
// Container names are subject to the DNS-1123 standard
containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
containerName := taskExecMetadata.GetTaskExecutionID().GetGeneratedName()
if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 {
containerName = rand.String(4)
}

container := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Expand All @@ -217,12 +212,49 @@ func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *c
Env: ToK8sEnvVar(taskContainer.GetEnv()),
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
}
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil {

return container, nil
}

// ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying
// all Flyte configuration including k8s plugins and resource requests.
func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error) {
taskTemplate, err := tCtx.TaskReader().Read(ctx)
if err != nil {
logger.Warnf(ctx, "failed to read task information when trying to construct container, err: %s", err.Error())
return nil, err
}

// validate arguments
if taskTemplate.GetContainer() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to create container with no definition in TaskTemplate")
}
if tCtx.TaskExecutionMetadata().GetOverrides() == nil || tCtx.TaskExecutionMetadata().GetOverrides().GetResources() == nil {
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
}

// build raw container
container, err := BuildRawContainer(ctx, taskTemplate.GetContainer(), tCtx.TaskExecutionMetadata())
if err != nil {
return nil, err
}

if container.SecurityContext == nil && config.GetK8sPluginConfig().DefaultSecurityContext != nil {
container.SecurityContext = config.GetK8sPluginConfig().DefaultSecurityContext.DeepCopy()
}

// add flyte resource customizations to the container
templateParameters := template.Parameters{
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
Inputs: tCtx.InputReader(),
OutputPath: tCtx.OutputWriter(),
Task: tCtx.TaskReader(),
}

if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeAssignResources, container); err != nil {
return nil, err
}

return container, nil
}

Expand Down
63 changes: 43 additions & 20 deletions go/tasks/pluginmachinery/flytek8s/container_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,26 +335,45 @@ func TestMergeResources_PartialResourceKeys(t *testing.T) {
}

func TestToK8sContainer(t *testing.T) {
taskContainer := &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
taskTemplate := &core.TaskTemplate{
Type: "test",
Target: &core.TaskTemplate_Container{
Container: &core.Container{
Image: "myimage",
Args: []string{
"arg1",
"arg2",
"arg3",
},
Command: []string{
"com1",
"com2",
"com3",
},
Env: []*core.KeyValuePair{
{
Key: "k",
Value: "v",
},
},
},
},
}

taskReader := &mocks.TaskReader{}
taskReader.On("Read", mock.Anything).Return(taskTemplate, nil)

inputReader := &mocks2.InputReader{}
inputReader.OnGetInputPath().Return(storage.DataReference("test-data-reference"))
inputReader.OnGetInputPrefixPath().Return(storage.DataReference("test-data-reference-prefix"))
inputReader.OnGetMatch(mock.Anything).Return(&core.LiteralMap{}, nil)

outputWriter := &mocks2.OutputWriter{}
outputWriter.OnGetOutputPrefixPath().Return("")
outputWriter.OnGetRawOutputPrefix().Return("")
outputWriter.OnGetCheckpointPrefix().Return("/checkpoint")
outputWriter.OnGetPreviousCheckpointsPrefix().Return("/prev")

mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
mockTaskOverrides := mocks.TaskOverrides{}
mockTaskOverrides.OnGetResources().Return(&v1.ResourceRequirements{
Expand All @@ -364,12 +383,16 @@ func TestToK8sContainer(t *testing.T) {
})
mockTaskExecMetadata.OnGetOverrides().Return(&mockTaskOverrides)
mockTaskExecutionID := mocks.TaskExecutionID{}
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{})
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID)
mockTaskExecMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})

templateParameters := template.Parameters{
TaskExecMetadata: &mockTaskExecMetadata,
}
tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
tCtx.OnInputReader().Return(inputReader)
tCtx.OnTaskReader().Return(taskReader)
tCtx.OnOutputWriter().Return(outputWriter)

cfg := config.GetK8sPluginConfig()
allow := false
Expand All @@ -378,7 +401,7 @@ func TestToK8sContainer(t *testing.T) {
}
assert.NoError(t, config.SetK8sPluginConfig(cfg))

container, err := ToK8sContainer(context.TODO(), taskContainer, nil, templateParameters)
container, err := ToK8sContainer(context.TODO(), tCtx)
assert.NoError(t, err)
assert.Equal(t, container.Image, "myimage")
assert.EqualValues(t, []string{
Expand Down
9 changes: 4 additions & 5 deletions go/tasks/pluginmachinery/flytek8s/copilot.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
c.SecurityContext.Capabilities.Add = append(c.SecurityContext.Capabilities.Add, pTraceCapability)

if iFace != nil {
if iFace.Inputs != nil {
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
inPath := cfg.DefaultInputDataPath
if pilot.GetInputPath() != "" {
inPath = pilot.GetInputPath()
Expand All @@ -187,7 +187,7 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
})
}

if iFace.Outputs != nil {
if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
outPath := cfg.DefaultOutputPath
if pilot.GetOutputPath() != "" {
outPath = pilot.GetOutputPath()
Expand All @@ -210,7 +210,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
shareProcessNamespaceEnabled := true
coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled
if iFace != nil {
if iFace.Inputs != nil {
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
inPath := cfg.DefaultInputDataPath
if pilot.GetInputPath() != "" {
inPath = pilot.GetInputPath()
Expand Down Expand Up @@ -240,7 +240,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
coPilotPod.InitContainers = append(coPilotPod.InitContainers, downloader)
}

if iFace.Outputs != nil {
if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
outPath := cfg.DefaultOutputPath
if pilot.GetOutputPath() != "" {
outPath = pilot.GetOutputPath()
Expand Down Expand Up @@ -268,7 +268,6 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
}
coPilotPod.Containers = append(coPilotPod.Containers, sidecar)
}

}

return nil
Expand Down
Loading

0 comments on commit ad8075c

Please sign in to comment.