From 0a681cd1a9e2c6586e6408532767539be65737f4 Mon Sep 17 00:00:00 2001 From: Abdullah Mobeen Date: Mon, 20 Mar 2023 12:19:45 -0400 Subject: [PATCH 1/3] feat: add support for inter-cluster communication for Ray plugin (#321) * feat: add inter-cluster com to ray plugin Signed-off-by: Abdullah Mobeen * test: add tests for remote cluster Signed-off-by: Abdullah Mobeen * refactor: move auth to config Signed-off-by: Abdullah Mobeen * fix description Co-authored-by: Kevin Su Signed-off-by: Abdullah Mobeen * refactor: move clusterconfig and auth to pluginmachinery k8s Signed-off-by: Abdullah Mobeen * refactor: move clusterconfig and auth to pluginmachinery k8s Signed-off-by: Abdullah Mobeen * chore: remove commented out code Signed-off-by: Abdullah Mobeen --------- Signed-off-by: Abdullah Mobeen Signed-off-by: Abdullah Mobeen Co-authored-by: Kevin Su --- go/tasks/pluginmachinery/k8s/config.go | 58 +++++++++++++++++++ go/tasks/plugins/k8s/ray/config.go | 4 ++ go/tasks/plugins/k8s/ray/config_test.go | 26 +++++++++ go/tasks/plugins/k8s/ray/ray.go | 14 +++++ go/tasks/plugins/k8s/ray/testdata/config.yaml | 7 +++ 5 files changed, 109 insertions(+) create mode 100644 go/tasks/pluginmachinery/k8s/config.go create mode 100644 go/tasks/plugins/k8s/ray/config_test.go create mode 100644 go/tasks/plugins/k8s/ray/testdata/config.yaml diff --git a/go/tasks/pluginmachinery/k8s/config.go b/go/tasks/pluginmachinery/k8s/config.go new file mode 100644 index 000000000..43ad68227 --- /dev/null +++ b/go/tasks/pluginmachinery/k8s/config.go @@ -0,0 +1,58 @@ +package k8s + +import ( + "fmt" + "io/ioutil" + + "github.com/pkg/errors" + restclient "k8s.io/client-go/rest" +) + +type ClusterConfig struct { + Name string `json:"name" pflag:",Friendly name of the remote cluster"` + Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"` + Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"` + Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"` +} + +type Auth struct { + TokenPath string `json:"tokenPath" pflag:", Token path"` + CaCertPath string `json:"caCertPath" pflag:", Certificate path"` +} + +func (auth Auth) GetCA() ([]byte, error) { + cert, err := ioutil.ReadFile(auth.CaCertPath) + if err != nil { + return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path") + } + return cert, nil +} + +func (auth Auth) GetToken() (string, error) { + token, err := ioutil.ReadFile(auth.TokenPath) + if err != nil { + return "", errors.Wrap(err, "failed to read k8s bearer token from configured path") + } + return string(token), nil +} + +// KubeClientConfig ... +func KubeClientConfig(host string, auth Auth) (*restclient.Config, error) { + tokenString, err := auth.GetToken() + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err)) + } + + caCert, err := auth.GetCA() + if err != nil { + return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err)) + } + + tlsClientConfig := restclient.TLSClientConfig{} + tlsClientConfig.CAData = caCert + return &restclient.Config{ + Host: host, + TLSClientConfig: tlsClientConfig, + BearerToken: tokenString, + }, nil +} diff --git a/go/tasks/plugins/k8s/ray/config.go b/go/tasks/plugins/k8s/ray/config.go index e141708ab..8b699cd79 100644 --- a/go/tasks/plugins/k8s/ray/config.go +++ b/go/tasks/plugins/k8s/ray/config.go @@ -2,6 +2,7 @@ package ray import ( pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config" + pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" ) //go:generate pflags Config --default-var=defaultConfig @@ -40,6 +41,9 @@ type Config struct { // NodeIPAddress the IP address of the head node. By default, this is pod ip address. NodeIPAddress string `json:"nodeIPAddress,omitempty"` + + // Remote Ray Cluster Config + RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` } func GetConfig() *Config { diff --git a/go/tasks/plugins/k8s/ray/config_test.go b/go/tasks/plugins/k8s/ray/config_test.go new file mode 100644 index 000000000..4fc005cce --- /dev/null +++ b/go/tasks/plugins/k8s/ray/config_test.go @@ -0,0 +1,26 @@ +package ray + +import ( + "testing" + + pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + "gotest.tools/assert" +) + +func TestLoadConfig(t *testing.T) { + rayConfig := GetConfig() + assert.Assert(t, rayConfig != nil) + + t.Run("remote cluster", func(t *testing.T) { + config := GetConfig() + remoteConfig := pluginmachinery.ClusterConfig{ + Enabled: false, + Endpoint: "", + Auth: pluginmachinery.Auth{ + TokenPath: "", + CaCertPath: "", + }, + } + assert.DeepEqual(t, config.RemoteClusterConfig, remoteConfig) + }) +} diff --git a/go/tasks/plugins/k8s/ray/ray.go b/go/tasks/plugins/k8s/ray/ray.go index d7e274250..614d1af9f 100644 --- a/go/tasks/plugins/k8s/ray/ray.go +++ b/go/tasks/plugins/k8s/ray/ray.go @@ -371,5 +371,19 @@ func init() { ResourceToWatch: &rayv1alpha1.RayJob{}, Plugin: rayJobResourceHandler{}, IsDefault: false, + CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { + remoteConfig := GetConfig().RemoteClusterConfig + if !remoteConfig.Enabled { + // use controller-runtime KubeClient + return nil, nil + } + + kubeConfig, err := k8s.KubeClientConfig(remoteConfig.Endpoint, remoteConfig.Auth) + if err != nil { + return nil, err + } + + return k8s.NewDefaultKubeClient(kubeConfig) + }, }) } diff --git a/go/tasks/plugins/k8s/ray/testdata/config.yaml b/go/tasks/plugins/k8s/ray/testdata/config.yaml new file mode 100644 index 000000000..cc5ddb5c3 --- /dev/null +++ b/go/tasks/plugins/k8s/ray/testdata/config.yaml @@ -0,0 +1,7 @@ +plugins: + ray: + remoteClusterConfig: + endpoint: 127.0.0.1 + auth: + tokenPath: /path/token + caCertPath: /path/cert \ No newline at end of file From 60d345d49f3050170b5283ec332a9d8facf3b952 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 23 Mar 2023 08:44:52 -0500 Subject: [PATCH 2/3] Fixing pod plugin event reporting timestamps (#307) * corrected timestamps for pod plugin Signed-off-by: Dan Rammer * but actually this time Signed-off-by: Dan Rammer * added reported at support Signed-off-by: Daniel Rammer * fixed merge Signed-off-by: Daniel Rammer * updated flyteidl Signed-off-by: Daniel Rammer * updated flyteidl deps Signed-off-by: Daniel Rammer --------- Signed-off-by: Dan Rammer Signed-off-by: Daniel Rammer --- go.mod | 2 +- go.sum | 4 ++-- go/tasks/pluginmachinery/core/phase.go | 7 +++++-- .../pluginmachinery/flytek8s/pod_helper.go | 19 ++++++++++++++++--- go/tasks/plugins/k8s/pod/plugin.go | 6 ++++++ 5 files changed, 30 insertions(+), 8 deletions(-) diff --git a/go.mod b/go.mod index e06d9fa50..4d54cbdea 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/athena v1.0.0 github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0 github.com/coocood/freecache v1.1.1 - github.com/flyteorg/flyteidl v1.3.12 + github.com/flyteorg/flyteidl v1.3.14 github.com/flyteorg/flytestdlib v1.0.15 github.com/go-test/deep v1.0.7 github.com/golang/protobuf v1.5.2 diff --git a/go.sum b/go.sum index 209c816b5..179728dec 100644 --- a/go.sum +++ b/go.sum @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q= -github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= +github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8= +github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM= github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0= github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s= github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index 9cfdbe2ba..fd8128f3b 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -86,9 +86,12 @@ type ExternalResource struct { type TaskInfo struct { // log information for the task execution Logs []*core.TaskLog - // Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current - // time at the time of publishing the event. + // This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte + // checked the task status. OccurredAt *time.Time + // This value represents the time the status was reported at. If not provided, will be defaulted to the current time + // when Flyte published the event. + ReportedAt *time.Time // Custom Event information that the plugin would like to expose to the front-end CustomInfo *structpb.Struct // A collection of information about external resources launched by this task diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index 6297b07c3..ee26ce4dc 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -664,13 +664,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time { var lastTransitionTime metav1.Time containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) for _, containerStatus := range containerStatuses { - if r := containerStatus.LastTerminationState.Running; r != nil { + if r := containerStatus.State.Running; r != nil { if r.StartedAt.Unix() > lastTransitionTime.Unix() { lastTransitionTime = r.StartedAt } - } else if r := containerStatus.LastTerminationState.Terminated; r != nil { + } else if r := containerStatus.State.Terminated; r != nil { if r.FinishedAt.Unix() > lastTransitionTime.Unix() { - lastTransitionTime = r.StartedAt + lastTransitionTime = r.FinishedAt } } } @@ -681,3 +681,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time { return lastTransitionTime } + +func GetReportedAt(pod *v1.Pod) metav1.Time { + var reportedAt metav1.Time + for _, condition := range pod.Status.Conditions { + if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse { + if condition.LastTransitionTime.Unix() > reportedAt.Unix() { + reportedAt = condition.LastTransitionTime + } + } + } + + return reportedAt +} diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index 33f69b3c2..e4b4ac867 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -148,8 +148,14 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin pod := r.(*v1.Pod) transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time + reportedAt := flytek8s.GetReportedAt(pod).Time + if reportedAt.IsZero() { + reportedAt = transitionOccurredAt + } + info := pluginsCore.TaskInfo{ OccurredAt: &transitionOccurredAt, + ReportedAt: &reportedAt, } if pod.Status.Phase != v1.PodPending && pod.Status.Phase != v1.PodUnknown { From 18a594ecef8bf744b42d9a824df2d53635aa31d2 Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Mon, 27 Mar 2023 18:04:37 -0500 Subject: [PATCH 3/3] persisting k8s plugin phase, version, and reason (#331) * using PhaseVersion to ensure Reason updates are sent Signed-off-by: Daniel Rammer * refactored and fixed tests Signed-off-by: Daniel Rammer * fixed linter Signed-off-by: Daniel Rammer * added docs Signed-off-by: Daniel Rammer * corrected missing old usecase Signed-off-by: Daniel Rammer * actually this will work Signed-off-by: Daniel Rammer * added missing increment on phsae version - thanks yee Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- go/tasks/pluginmachinery/core/phase.go | 10 ++++ .../k8s/mocks/plugin_context.go | 34 +++++++++++ go/tasks/pluginmachinery/k8s/plugin.go | 14 +++++ go/tasks/plugins/array/k8s/management_test.go | 4 ++ go/tasks/plugins/k8s/pod/container_test.go | 25 ++++++-- go/tasks/plugins/k8s/pod/plugin.go | 59 ++++++++++++------- go/tasks/plugins/k8s/pod/sidecar_test.go | 4 ++ 7 files changed, 124 insertions(+), 26 deletions(-) diff --git a/go/tasks/pluginmachinery/core/phase.go b/go/tasks/pluginmachinery/core/phase.go index fd8128f3b..5b9a7e110 100644 --- a/go/tasks/pluginmachinery/core/phase.go +++ b/go/tasks/pluginmachinery/core/phase.go @@ -139,6 +139,16 @@ func (p PhaseInfo) Err() *core.ExecutionError { return p.err } +func (p PhaseInfo) WithVersion(version uint32) PhaseInfo { + return PhaseInfo{ + phase: p.phase, + version: version, + info: p.info, + err: p.err, + reason: p.reason, + } +} + func (p PhaseInfo) String() string { if p.err != nil { return fmt.Sprintf("Phase<%s:%d Error:%s>", p.phase, p.version, p.err) diff --git a/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go b/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go index 10e969a03..e77ab3920 100644 --- a/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go +++ b/go/tasks/pluginmachinery/k8s/mocks/plugin_context.go @@ -150,6 +150,40 @@ func (_m *PluginContext) OutputWriter() io.OutputWriter { return r0 } +type PluginContext_PluginStateReader struct { + *mock.Call +} + +func (_m PluginContext_PluginStateReader) Return(_a0 core.PluginStateReader) *PluginContext_PluginStateReader { + return &PluginContext_PluginStateReader{Call: _m.Call.Return(_a0)} +} + +func (_m *PluginContext) OnPluginStateReader() *PluginContext_PluginStateReader { + c_call := _m.On("PluginStateReader") + return &PluginContext_PluginStateReader{Call: c_call} +} + +func (_m *PluginContext) OnPluginStateReaderMatch(matchers ...interface{}) *PluginContext_PluginStateReader { + c_call := _m.On("PluginStateReader", matchers...) + return &PluginContext_PluginStateReader{Call: c_call} +} + +// PluginStateReader provides a mock function with given fields: +func (_m *PluginContext) PluginStateReader() core.PluginStateReader { + ret := _m.Called() + + var r0 core.PluginStateReader + if rf, ok := ret.Get(0).(func() core.PluginStateReader); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(core.PluginStateReader) + } + } + + return r0 +} + type PluginContext_TaskExecutionMetadata struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/k8s/plugin.go b/go/tasks/pluginmachinery/k8s/plugin.go index b94ef114b..d8d99ee82 100644 --- a/go/tasks/pluginmachinery/k8s/plugin.go +++ b/go/tasks/pluginmachinery/k8s/plugin.go @@ -68,6 +68,20 @@ type PluginContext interface { // Returns a handle to the Task's execution metadata. TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata + + // Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable + PluginStateReader() pluginsCore.PluginStateReader +} + +// PluginState defines the state of a k8s plugin. This information must be maintained between propeller evaluations to +// determine if there have been any updates since the previously evaluation. +type PluginState struct { + // Phase is the plugin phase. + Phase pluginsCore.Phase + // PhaseVersion is an number used to indicate reportable changes to state that have the same phase. + PhaseVersion uint32 + // Reason is the message explaining the purpose for being in the reported state. + Reason string } // Defines a simplified interface to author plugins for k8s resources. diff --git a/go/tasks/plugins/array/k8s/management_test.go b/go/tasks/plugins/array/k8s/management_test.go index b4e7d381d..fc422132b 100644 --- a/go/tasks/plugins/array/k8s/management_test.go +++ b/go/tasks/plugins/array/k8s/management_test.go @@ -129,12 +129,16 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta ReferenceConstructor: &storage.URLPathConstructor{}, } + pluginStateReader := &mocks.PluginStateReader{} + pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil) + tCtx := &mocks.TaskExecutionContext{} tCtx.OnTaskReader().Return(tr) tCtx.OnTaskExecutionMetadata().Return(tMeta) tCtx.OnOutputWriter().Return(ow) tCtx.OnInputReader().Return(ir) tCtx.OnDataStore().Return(dataStore) + tCtx.OnPluginStateReader().Return(pluginStateReader) return tCtx } diff --git a/go/tasks/plugins/k8s/pod/container_test.go b/go/tasks/plugins/k8s/pod/container_test.go index d4b907915..96872f115 100644 --- a/go/tasks/plugins/k8s/pod/container_test.go +++ b/go/tasks/plugins/k8s/pod/container_test.go @@ -102,6 +102,11 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str taskCtx.OnTaskReader().Return(taskReader) taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata) + + pluginStateReader := &pluginsCoreMock.PluginStateReader{} + pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil) + taskCtx.OnPluginStateReader().Return(pluginStateReader) + return taskCtx } @@ -140,6 +145,10 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) { } func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { + command := []string{"command"} + args := []string{"{{.Input}}"} + taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args) + j := &v1.Pod{ Status: v1.PodStatus{}, } @@ -147,21 +156,21 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { ctx := context.TODO() t.Run("running", func(t *testing.T) { j.Status.Phase = v1.PodRunning - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase()) }) t.Run("queued", func(t *testing.T) { j.Status.Phase = v1.PodPending - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseQueued, phaseInfo.Phase()) }) t.Run("failNoCondition", func(t *testing.T) { j.Status.Phase = v1.PodFailed - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase()) ec := phaseInfo.Err().GetCode() @@ -177,7 +186,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { Type: v1.PodReasonUnschedulable, }, } - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j) assert.NoError(t, err) assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase()) ec := phaseInfo.Err().GetCode() @@ -186,7 +195,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) { t.Run("success", func(t *testing.T) { j.Status.Phase = v1.PodSucceeded - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j) assert.NoError(t, err) assert.NotNil(t, phaseInfo) assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase()) @@ -199,6 +208,10 @@ func TestContainerTaskExecutor_GetProperties(t *testing.T) { } func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) { + command := []string{"command"} + args := []string{"{{.Input}}"} + taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args) + ctx := context.TODO() reason := "InvalidImageName" message := "Failed to apply default image tag \"TEST/flyteorg/myapp:latest\": couldn't parse image reference" + @@ -230,7 +243,7 @@ func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) { t.Run("failInvalidImageName", func(t *testing.T) { pendingPod.Status.Phase = v1.PodPending - phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pendingPod) + phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, pendingPod) finalReason := fmt.Sprintf("|%s", reason) finalMessage := fmt.Sprintf("|%s", message) assert.NoError(t, err) diff --git a/go/tasks/plugins/k8s/pod/plugin.go b/go/tasks/plugins/k8s/pod/plugin.go index e4b4ac867..d1c509fb7 100644 --- a/go/tasks/plugins/k8s/pod/plugin.go +++ b/go/tasks/plugins/k8s/pod/plugin.go @@ -145,6 +145,12 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex } func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string) (pluginsCore.PhaseInfo, error) { + pluginState := k8s.PluginState{} + _, err := pluginContext.PluginStateReader().Get(&pluginState) + if err != nil { + return pluginsCore.PhaseInfoUndefined, err + } + pod := r.(*v1.Pod) transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time @@ -166,36 +172,49 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin info.Logs = taskLogs } + phaseInfo := pluginsCore.PhaseInfoUndefined switch pod.Status.Phase { case v1.PodSucceeded: - return flytek8s.DemystifySuccess(pod.Status, info) + phaseInfo, err = flytek8s.DemystifySuccess(pod.Status, info) case v1.PodFailed: - return flytek8s.DemystifyFailure(pod.Status, info) + phaseInfo, err = flytek8s.DemystifyFailure(pod.Status, info) case v1.PodPending: - return flytek8s.DemystifyPending(pod.Status) + phaseInfo, err = flytek8s.DemystifyPending(pod.Status) case v1.PodReasonUnschedulable: - return pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable"), nil + phaseInfo = pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable") case v1.PodUnknown: - return pluginsCore.PhaseInfoUndefined, nil - } - - primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey] - if !exists { - // if the primary container annotation dos not exist, then the task requires all containers - // to succeed to declare success. therefore, if the pod is not in one of the above states we - // fallback to declaring the task as 'running'. - if len(info.Logs) > 0 { - return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, &info), nil + // DO NOTHING + default: + primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey] + if !exists { + // if the primary container annotation dos not exist, then the task requires all containers + // to succeed to declare success. therefore, if the pod is not in one of the above states we + // fallback to declaring the task as 'running'. + phaseInfo = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info) + if len(info.Logs) > 0 { + phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1) + } + } else { + // if the primary container annotation exists, we use the status of the specified container + phaseInfo = flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) + if phaseInfo.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { + phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1) + } } - return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil } - // if the primary container annotation exists, we use the status of the specified container - primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info) - if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 { - return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil + if err != nil { + return pluginsCore.PhaseInfoUndefined, err + } else if phaseInfo.Phase() != pluginsCore.PhaseRunning && phaseInfo.Phase() == pluginState.Phase && + phaseInfo.Version() <= pluginState.PhaseVersion && phaseInfo.Reason() != pluginState.Reason { + + // if we have the same Phase as the previous evaluation and updated the Reason but not the PhaseVersion we must + // update the PhaseVersion so an event is sent to reflect the Reason update. this does not handle the Running + // Phase because the legacy used `DefaultPhaseVersion + 1` which will only increment to 1. + phaseInfo = phaseInfo.WithVersion(pluginState.PhaseVersion + 1) } - return primaryContainerPhase, nil + + return phaseInfo, err } func (plugin) GetProperties() k8s.PluginProperties { diff --git a/go/tasks/plugins/k8s/pod/sidecar_test.go b/go/tasks/plugins/k8s/pod/sidecar_test.go index ecb0e36f8..1dc3ac9f1 100644 --- a/go/tasks/plugins/k8s/pod/sidecar_test.go +++ b/go/tasks/plugins/k8s/pod/sidecar_test.go @@ -120,6 +120,10 @@ func getDummySidecarTaskContext(taskTemplate *core.TaskTemplate, resources *v1.R taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata) + pluginStateReader := &pluginsCoreMock.PluginStateReader{} + pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil) + taskCtx.OnPluginStateReader().Return(pluginStateReader) + return taskCtx }