Skip to content

Commit

Permalink
persisting k8s plugin phase, version, and reason (flyteorg#331)
Browse files Browse the repository at this point in the history
* using PhaseVersion to ensure Reason updates are sent

Signed-off-by: Daniel Rammer <[email protected]>

* refactored and fixed tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed linter

Signed-off-by: Daniel Rammer <[email protected]>

* added docs

Signed-off-by: Daniel Rammer <[email protected]>

* corrected missing old usecase

Signed-off-by: Daniel Rammer <[email protected]>

* actually this will work

Signed-off-by: Daniel Rammer <[email protected]>

* added missing increment on phsae version - thanks yee

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Mar 27, 2023
1 parent 60d345d commit 18a594e
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 26 deletions.
10 changes: 10 additions & 0 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,16 @@ func (p PhaseInfo) Err() *core.ExecutionError {
return p.err
}

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
return PhaseInfo{
phase: p.phase,
version: version,
info: p.info,
err: p.err,
reason: p.reason,
}
}

func (p PhaseInfo) String() string {
if p.err != nil {
return fmt.Sprintf("Phase<%s:%d Error:%s>", p.phase, p.version, p.err)
Expand Down
34 changes: 34 additions & 0 deletions go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ type PluginContext interface {

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

// Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable
PluginStateReader() pluginsCore.PluginStateReader
}

// PluginState defines the state of a k8s plugin. This information must be maintained between propeller evaluations to
// determine if there have been any updates since the previously evaluation.
type PluginState struct {
// Phase is the plugin phase.
Phase pluginsCore.Phase
// PhaseVersion is an number used to indicate reportable changes to state that have the same phase.
PhaseVersion uint32
// Reason is the message explaining the purpose for being in the reported state.
Reason string
}

// Defines a simplified interface to author plugins for k8s resources.
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,16 @@ func getMockTaskExecutionContext(ctx context.Context, parallelism int) *mocks.Ta
ReferenceConstructor: &storage.URLPathConstructor{},
}

pluginStateReader := &mocks.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)

tCtx := &mocks.TaskExecutionContext{}
tCtx.OnTaskReader().Return(tr)
tCtx.OnTaskExecutionMetadata().Return(tMeta)
tCtx.OnOutputWriter().Return(ow)
tCtx.OnInputReader().Return(ir)
tCtx.OnDataStore().Return(dataStore)
tCtx.OnPluginStateReader().Return(pluginStateReader)
return tCtx
}

Expand Down
25 changes: 19 additions & 6 deletions go/tasks/plugins/k8s/pod/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func dummyContainerTaskContext(resources *v1.ResourceRequirements, command []str
taskCtx.OnTaskReader().Return(taskReader)

taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)

pluginStateReader := &pluginsCoreMock.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
taskCtx.OnPluginStateReader().Return(pluginStateReader)

return taskCtx
}

Expand Down Expand Up @@ -140,28 +145,32 @@ func TestContainerTaskExecutor_BuildResource(t *testing.T) {
}

func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)

j := &v1.Pod{
Status: v1.PodStatus{},
}

ctx := context.TODO()
t.Run("running", func(t *testing.T) {
j.Status.Phase = v1.PodRunning
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRunning, phaseInfo.Phase())
})

t.Run("queued", func(t *testing.T) {
j.Status.Phase = v1.PodPending
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseQueued, phaseInfo.Phase())
})

t.Run("failNoCondition", func(t *testing.T) {
j.Status.Phase = v1.PodFailed
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -177,7 +186,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {
Type: v1.PodReasonUnschedulable,
},
}
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.Equal(t, pluginsCore.PhaseRetryableFailure, phaseInfo.Phase())
ec := phaseInfo.Err().GetCode()
Expand All @@ -186,7 +195,7 @@ func TestContainerTaskExecutor_GetTaskStatus(t *testing.T) {

t.Run("success", func(t *testing.T) {
j.Status.Phase = v1.PodSucceeded
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, j)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, j)
assert.NoError(t, err)
assert.NotNil(t, phaseInfo)
assert.Equal(t, pluginsCore.PhaseSuccess, phaseInfo.Phase())
Expand All @@ -199,6 +208,10 @@ func TestContainerTaskExecutor_GetProperties(t *testing.T) {
}

func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {
command := []string{"command"}
args := []string{"{{.Input}}"}
taskCtx := dummyContainerTaskContext(containerResourceRequirements, command, args)

ctx := context.TODO()
reason := "InvalidImageName"
message := "Failed to apply default image tag \"TEST/flyteorg/myapp:latest\": couldn't parse image reference" +
Expand Down Expand Up @@ -230,7 +243,7 @@ func TestContainerTaskExecutor_GetTaskStatus_InvalidImageName(t *testing.T) {

t.Run("failInvalidImageName", func(t *testing.T) {
pendingPod.Status.Phase = v1.PodPending
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, nil, pendingPod)
phaseInfo, err := DefaultPodPlugin.GetTaskPhase(ctx, taskCtx, pendingPod)
finalReason := fmt.Sprintf("|%s", reason)
finalMessage := fmt.Sprintf("|%s", message)
assert.NoError(t, err)
Expand Down
59 changes: 39 additions & 20 deletions go/tasks/plugins/k8s/pod/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ func (p plugin) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContex
}

func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.PluginContext, r client.Object, logPlugin tasklog.Plugin, logSuffix string) (pluginsCore.PhaseInfo, error) {
pluginState := k8s.PluginState{}
_, err := pluginContext.PluginStateReader().Get(&pluginState)
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
}

pod := r.(*v1.Pod)

transitionOccurredAt := flytek8s.GetLastTransitionOccurredAt(pod).Time
Expand All @@ -166,36 +172,49 @@ func (plugin) GetTaskPhaseWithLogs(ctx context.Context, pluginContext k8s.Plugin
info.Logs = taskLogs
}

phaseInfo := pluginsCore.PhaseInfoUndefined
switch pod.Status.Phase {
case v1.PodSucceeded:
return flytek8s.DemystifySuccess(pod.Status, info)
phaseInfo, err = flytek8s.DemystifySuccess(pod.Status, info)
case v1.PodFailed:
return flytek8s.DemystifyFailure(pod.Status, info)
phaseInfo, err = flytek8s.DemystifyFailure(pod.Status, info)
case v1.PodPending:
return flytek8s.DemystifyPending(pod.Status)
phaseInfo, err = flytek8s.DemystifyPending(pod.Status)
case v1.PodReasonUnschedulable:
return pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable"), nil
phaseInfo = pluginsCore.PhaseInfoQueued(transitionOccurredAt, pluginsCore.DefaultPhaseVersion, "pod unschedulable")
case v1.PodUnknown:
return pluginsCore.PhaseInfoUndefined, nil
}

primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
if !exists {
// if the primary container annotation dos not exist, then the task requires all containers
// to succeed to declare success. therefore, if the pod is not in one of the above states we
// fallback to declaring the task as 'running'.
if len(info.Logs) > 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, &info), nil
// DO NOTHING
default:
primaryContainerName, exists := r.GetAnnotations()[flytek8s.PrimaryContainerKey]
if !exists {
// if the primary container annotation dos not exist, then the task requires all containers
// to succeed to declare success. therefore, if the pod is not in one of the above states we
// fallback to declaring the task as 'running'.
phaseInfo = pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info)
if len(info.Logs) > 0 {
phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1)
}
} else {
// if the primary container annotation exists, we use the status of the specified container
phaseInfo = flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)
if phaseInfo.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 {
phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1)
}
}
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, &info), nil
}

// if the primary container annotation exists, we use the status of the specified container
primaryContainerPhase := flytek8s.DeterminePrimaryContainerPhase(primaryContainerName, pod.Status.ContainerStatuses, &info)
if primaryContainerPhase.Phase() == pluginsCore.PhaseRunning && len(info.Logs) > 0 {
return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion+1, primaryContainerPhase.Info()), nil
if err != nil {
return pluginsCore.PhaseInfoUndefined, err
} else if phaseInfo.Phase() != pluginsCore.PhaseRunning && phaseInfo.Phase() == pluginState.Phase &&
phaseInfo.Version() <= pluginState.PhaseVersion && phaseInfo.Reason() != pluginState.Reason {

// if we have the same Phase as the previous evaluation and updated the Reason but not the PhaseVersion we must
// update the PhaseVersion so an event is sent to reflect the Reason update. this does not handle the Running
// Phase because the legacy used `DefaultPhaseVersion + 1` which will only increment to 1.
phaseInfo = phaseInfo.WithVersion(pluginState.PhaseVersion + 1)
}
return primaryContainerPhase, nil

return phaseInfo, err
}

func (plugin) GetProperties() k8s.PluginProperties {
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/plugins/k8s/pod/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func getDummySidecarTaskContext(taskTemplate *core.TaskTemplate, resources *v1.R

taskCtx.OnTaskExecutionMetadata().Return(dummyTaskMetadata)

pluginStateReader := &pluginsCoreMock.PluginStateReader{}
pluginStateReader.OnGetMatch(mock.Anything).Return(0, nil)
taskCtx.OnPluginStateReader().Return(pluginStateReader)

return taskCtx
}

Expand Down

0 comments on commit 18a594e

Please sign in to comment.