Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:flyteorg/flyteplugins into dbx-bug-1
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw committed Mar 27, 2023
2 parents ad8075c + 18a594e commit c79ed62
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flytestdlib v1.0.15
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q=
github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
17 changes: 15 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ type ExternalResource struct {
type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
// Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current
// time at the time of publishing the event.
// This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte
// checked the task status.
OccurredAt *time.Time
// This value represents the time the status was reported at. If not provided, will be defaulted to the current time
// when Flyte published the event.
ReportedAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// A collection of information about external resources launched by this task
Expand Down Expand Up @@ -136,6 +139,16 @@ func (p PhaseInfo) Err() *core.ExecutionError {
return p.err
}

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
return PhaseInfo{
phase: p.phase,
version: version,
info: p.info,
err: p.err,
reason: p.reason,
}
}

func (p PhaseInfo) String() string {
if p.err != nil {
return fmt.Sprintf("Phase<%s:%d Error:%s>", p.phase, p.version, p.err)
Expand Down
19 changes: 16 additions & 3 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {
var lastTransitionTime metav1.Time
containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...)
for _, containerStatus := range containerStatuses {
if r := containerStatus.LastTerminationState.Running; r != nil {
if r := containerStatus.State.Running; r != nil {
if r.StartedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
}
} else if r := containerStatus.LastTerminationState.Terminated; r != nil {
} else if r := containerStatus.State.Terminated; r != nil {
if r.FinishedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
lastTransitionTime = r.FinishedAt
}
}
}
Expand All @@ -681,3 +681,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {

return lastTransitionTime
}

func GetReportedAt(pod *v1.Pod) metav1.Time {
var reportedAt metav1.Time
for _, condition := range pod.Status.Conditions {
if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse {
if condition.LastTransitionTime.Unix() > reportedAt.Unix() {
reportedAt = condition.LastTransitionTime
}
}
}

return reportedAt
}
58 changes: 58 additions & 0 deletions go/tasks/pluginmachinery/k8s/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package k8s

import (
"fmt"
"io/ioutil"

"github.com/pkg/errors"
restclient "k8s.io/client-go/rest"
)

type ClusterConfig struct {
Name string `json:"name" pflag:",Friendly name of the remote cluster"`
Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"`
Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"`
}

type Auth struct {
TokenPath string `json:"tokenPath" pflag:", Token path"`
CaCertPath string `json:"caCertPath" pflag:", Certificate path"`
}

func (auth Auth) GetCA() ([]byte, error) {
cert, err := ioutil.ReadFile(auth.CaCertPath)
if err != nil {
return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path")
}
return cert, nil
}

func (auth Auth) GetToken() (string, error) {
token, err := ioutil.ReadFile(auth.TokenPath)
if err != nil {
return "", errors.Wrap(err, "failed to read k8s bearer token from configured path")
}
return string(token), nil
}

// KubeClientConfig ...
func KubeClientConfig(host string, auth Auth) (*restclient.Config, error) {
tokenString, err := auth.GetToken()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err))
}

caCert, err := auth.GetCA()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err))
}

tlsClientConfig := restclient.TLSClientConfig{}
tlsClientConfig.CAData = caCert
return &restclient.Config{
Host: host,
TLSClientConfig: tlsClientConfig,
BearerToken: tokenString,
}, nil
}
34 changes: 34 additions & 0 deletions go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ type PluginContext interface {

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

// Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable
PluginStateReader() pluginsCore.PluginStateReader
}

// PluginState defines the state of a k8s plugin. This information must be maintained between propeller evaluations to
// determine if there have been any updates since the previously evaluation.
type PluginState struct {
// Phase is the plugin phase.
Phase pluginsCore.Phase
// PhaseVersion is an number used to indicate reportable changes to state that have the same phase.
PhaseVersion uint32
// Reason is the message explaining the purpose for being in the reported state.
Reason string
}

// Defines a simplified interface to author plugins for k8s resources.
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,16 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta
ReferenceConstructor: &storage.URLPathConstructor{},
}

pluginStateReader := &mocks.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskReader().Return(tr)
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnInputReader().Return(ir)
tCtx.OnDataStore().Return(dataStore)
tCtx.OnPluginStateReader().Return(pluginStateReader)
return tCtx
}

Expand Down
25 changes: 19 additions & 6 deletions go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
taskCtx.OnTaskReader().Return(taskReader)

taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)

pluginStateReader := &pluginsCoreMock.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
taskCtx.OnPluginStateReader().Return(pluginStateReader)

return taskCtx
}

Expand Down Expand Up @@ -140,28 +145,32 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) {
}

func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)

j := &v1.Pod{
Status: v1.PodStatus{},
}

ctx := context.TODO()
t.Run("running", func(t *testing.T) {
j.Status.Phase = v1.PodRunning
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
})

t.Run("queued", func(t *testing.T) {
j.Status.Phase = v1.PodPending
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, phaseInfo.Phase())
})

t.Run("failNoCondition", func(t *testing.T) {
j.Status.Phase = v1.PodFailed
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -177,7 +186,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
Type: v1.PodReasonUnschedulable,
},
}
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -186,7 +195,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {

t.Run("success", func(t *testing.T) {
j.Status.Phase = v1.PodSucceeded
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.NotNil(t, phaseInfo)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
Expand All @@ -199,6 +208,10 @@ func TestContainerTaskExecutor_GetProperties(t *testing.T) {
}

func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)

ctx := context.TODO()
reason := "InvalidImageName"
message := "Failed to apply default image tag \"TEST/flyteorg/myapp:latest\": couldn't parse image reference" +
Expand Down Expand Up @@ -230,7 +243,7 @@ func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {

t.Run("failInvalidImageName", func(t *testing.T) {
pendingPod.Status.Phase = v1.PodPending
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pendingPod)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, pendingPod)
finalReason := fmt.Sprintf("|%s", reason)
finalMessage := fmt.Sprintf("|%s", message)
assert.NoError(t, err)
Expand Down
Loading

0 comments on commit c79ed62

Please sign in to comment.