Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into bug/delete-image-pull-backoff
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw committed Apr 18, 2023
2 parents b113267 + f5f4182 commit 113947f
Show file tree
Hide file tree
Showing 26 changed files with 455 additions and 170 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
github.com/coocood/freecache v1.1.1
github.com/flyteorg/flyteidl v1.3.12
github.com/flyteorg/flyteidl v1.3.14
github.com/flyteorg/flytestdlib v1.0.15
github.com/go-test/deep v1.0.7
github.com/golang/protobuf v1.5.2
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,8 @@ github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQL
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/flyteorg/flyteidl v1.3.12 h1:RTcxCrqKU235cWuy+j3gkmqPJOaaYEcJaT6fsRjoS8Q=
github.com/flyteorg/flyteidl v1.3.12/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteidl v1.3.14 h1:o5M0g/r6pXTPu5PEurbYxbQmuOu3hqqsaI2M6uvK0N8=
github.com/flyteorg/flyteidl v1.3.14/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
17 changes: 15 additions & 2 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,12 @@ type ExternalResource struct {
type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
// Set this value to the intended time when the status occurred at. If not provided, will be defaulted to the current
// time at the time of publishing the event.
// This value represents the time the status occurred at. If not provided, it will be defaulted to the time Flyte
// checked the task status.
OccurredAt *time.Time
// This value represents the time the status was reported at. If not provided, will be defaulted to the current time
// when Flyte published the event.
ReportedAt *time.Time
// Custom Event information that the plugin would like to expose to the front-end
CustomInfo *structpb.Struct
// A collection of information about external resources launched by this task
Expand Down Expand Up @@ -144,6 +147,16 @@ func (p PhaseInfo) CleanupOnFailure() bool {
return p.cleanupOnFailure
}

func (p PhaseInfo) WithVersion(version uint32) PhaseInfo {
return PhaseInfo{
phase: p.phase,
version: version,
info: p.info,
err: p.err,
reason: p.reason,
}
}

func (p PhaseInfo) String() string {
if p.err != nil {
return fmt.Sprintf("Phase<%s:%d Error:%s>", p.phase, p.version, p.err)
Expand Down
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/core/transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const (
// The transition is eventually consistent. For all the state written may not be visible in the next call, but eventually will persist
// Best to use when the plugin logic is completely idempotent. This is also the most performant option.
TransitionTypeEphemeral TransitionType = iota
// @deprecated support for Barrier type transitions has been deprecated
// This transition tries its best to make the latest state visible for every consecutive read. But, it is possible
// to go back in time, i.e. monotonic consistency is violated (in rare cases).
TransitionTypeBarrier
Expand Down
8 changes: 7 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/container_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,19 @@ func BuildRawContainer(ctx context.Context, taskContainer *core.Container, taskE
containerName = rand.String(4)
}

res, err := ToK8sResourceRequirements(taskContainer.Resources)
if err != nil {
return nil, err
}

container := &v1.Container{
Name: containerName,
Image: taskContainer.GetImage(),
Args: taskContainer.GetArgs(),
Command: taskContainer.GetCommand(),
Env: ToK8sEnvVar(taskContainer.GetEnv()),
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
Resources: *res,
}

return container, nil
Expand Down Expand Up @@ -251,7 +257,7 @@ func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext)
Task: tCtx.TaskReader(),
}

if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeAssignResources, container); err != nil {
if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeMergeExistingResources, container); err != nil {
return nil, err
}

Expand Down
27 changes: 20 additions & 7 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,20 @@ func ApplyFlytePodConfiguration(ctx context.Context, tCtx pluginsCore.TaskExecut

// ToK8sPodSpec builds a PodSpec and ObjectMeta based on the definition passed by the TaskExecutionContext. This
// involves parsing the raw PodSpec definition and applying all Flyte configuration options.
func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, error) {
func ToK8sPodSpec(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (*v1.PodSpec, *metav1.ObjectMeta, string, error) {
// build raw PodSpec and ObjectMeta
podSpec, objectMeta, primaryContainerName, err := BuildRawPod(ctx, tCtx)
if err != nil {
return nil, nil, err
return nil, nil, "", err
}

// add flyte configuration
podSpec, objectMeta, err = ApplyFlytePodConfiguration(ctx, tCtx, podSpec, objectMeta, primaryContainerName)
if err != nil {
return nil, nil, err
return nil, nil, "", err
}

return podSpec, objectMeta, nil
return podSpec, objectMeta, primaryContainerName, nil
}

// getBasePodTemplate attempts to retrieve the PodTemplate to use as the base for k8s Pod configuration. This value can
Expand Down Expand Up @@ -664,13 +664,13 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {
var lastTransitionTime metav1.Time
containerStatuses := append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...)
for _, containerStatus := range containerStatuses {
if r := containerStatus.LastTerminationState.Running; r != nil {
if r := containerStatus.State.Running; r != nil {
if r.StartedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
}
} else if r := containerStatus.LastTerminationState.Terminated; r != nil {
} else if r := containerStatus.State.Terminated; r != nil {
if r.FinishedAt.Unix() > lastTransitionTime.Unix() {
lastTransitionTime = r.StartedAt
lastTransitionTime = r.FinishedAt
}
}
}
Expand All @@ -681,3 +681,16 @@ func GetLastTransitionOccurredAt(pod *v1.Pod) metav1.Time {

return lastTransitionTime
}

func GetReportedAt(pod *v1.Pod) metav1.Time {
var reportedAt metav1.Time
for _, condition := range pod.Status.Conditions {
if condition.Reason == "PodCompleted" && condition.Type == v1.PodReady && condition.Status == v1.ConditionFalse {
if condition.LastTransitionTime.Unix() > reportedAt.Unix() {
reportedAt = condition.LastTransitionTime
}
}
}

return reportedAt
}
18 changes: 9 additions & 9 deletions go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func toK8sPodInterruptible(t *testing.T) {
},
})

p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Len(t, p.Tolerations, 2)
assert.Equal(t, "x/flyte", p.Tolerations[1].Key)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestToK8sPod(t *testing.T) {
},
})

p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Equal(t, len(p.Tolerations), 1)
})
Expand All @@ -408,7 +408,7 @@ func TestToK8sPod(t *testing.T) {
},
})

p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Equal(t, len(p.Tolerations), 0)
assert.Equal(t, "some-acceptable-name", p.Containers[0].Name)
Expand All @@ -435,7 +435,7 @@ func TestToK8sPod(t *testing.T) {
DefaultMemoryRequest: resource.MustParse("1024Mi"),
}))

p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.Equal(t, 1, len(p.NodeSelector))
assert.Equal(t, "myScheduler", p.SchedulerName)
Expand All @@ -452,7 +452,7 @@ func TestToK8sPod(t *testing.T) {
}))

x := dummyExecContext(&v1.ResourceRequirements{})
p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.NotNil(t, p.SecurityContext)
assert.Equal(t, *p.SecurityContext.RunAsGroup, v)
Expand All @@ -464,7 +464,7 @@ func TestToK8sPod(t *testing.T) {
EnableHostNetworkingPod: &enabled,
}))
x := dummyExecContext(&v1.ResourceRequirements{})
p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.True(t, p.HostNetwork)
})
Expand All @@ -475,15 +475,15 @@ func TestToK8sPod(t *testing.T) {
EnableHostNetworkingPod: &enabled,
}))
x := dummyExecContext(&v1.ResourceRequirements{})
p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.False(t, p.HostNetwork)
})

t.Run("skipSettingHostNetwork", func(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{}))
x := dummyExecContext(&v1.ResourceRequirements{})
p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.False(t, p.HostNetwork)
})
Expand Down Expand Up @@ -517,7 +517,7 @@ func TestToK8sPod(t *testing.T) {
}))

x := dummyExecContext(&v1.ResourceRequirements{})
p, _, err := ToK8sPodSpec(ctx, x)
p, _, _, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.NotNil(t, p.DNSConfig)
assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, p.DNSConfig.Nameservers)
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/pluginmachinery/internal/webapi/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (c CorePlugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext)
return core.UnknownTransition, err
}

return core.DoTransitionType(core.TransitionTypeBarrier, phaseInfo), nil
return core.DoTransition(phaseInfo), nil
}

func (c CorePlugin) Abort(ctx context.Context, tCtx core.TaskExecutionContext) error {
Expand Down
34 changes: 34 additions & 0 deletions go/tasks/pluginmachinery/k8s/mocks/plugin_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions go/tasks/pluginmachinery/k8s/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ type PluginContext interface {

// Returns a handle to the Task's execution metadata.
TaskExecutionMetadata() pluginsCore.TaskExecutionMetadata

// Returns a reader that retrieves previously stored plugin internal state. the state itself is immutable
PluginStateReader() pluginsCore.PluginStateReader
}

// PluginState defines the state of a k8s plugin. This information must be maintained between propeller evaluations to
// determine if there have been any updates since the previously evaluation.
type PluginState struct {
// Phase is the plugin phase.
Phase pluginsCore.Phase
// PhaseVersion is an number used to indicate reportable changes to state that have the same phase.
PhaseVersion uint32
// Reason is the message explaining the purpose for being in the reported state.
Reason string
}

// Defines a simplified interface to author plugins for k8s resources.
Expand Down
46 changes: 30 additions & 16 deletions go/tasks/plugins/array/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,20 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex

size := -1
var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
literals := make([][]*idlCore.Literal, 0)
discoveredInputNames := make([]string, 0)
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
size = len(literal.GetCollection().Literals)
discoveredInputName = inputName
break
// validate length of input list
if size != -1 && size != len(literalCollection.Literals) {
state = state.SetPhase(arrayCore.PhasePermanentFailure, 0).SetReason("all maptask input lists must be the same length")
return state, nil
}

literals = append(literals, literalCollection.Literals)
discoveredInputNames = append(discoveredInputNames, inputName)

size = len(literalCollection.Literals)
}
}

Expand All @@ -105,7 +113,7 @@ func DetermineDiscoverability(ctx context.Context, tCtx core.TaskExecutionContex
arrayJobSize = int64(size)

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literals, discoveredInputNames)
}

if arrayJobSize > maxArrayJobSize {
Expand Down Expand Up @@ -242,16 +250,17 @@ func WriteToDiscovery(ctx context.Context, tCtx core.TaskExecutionContext, state
}

var literalCollection *idlCore.LiteralCollection
var discoveredInputName string
literals := make([][]*idlCore.Literal, 0)
discoveredInputNames := make([]string, 0)
for inputName, literal := range inputs.Literals {
if literalCollection = literal.GetCollection(); literalCollection != nil {
discoveredInputName = inputName
break
literals = append(literals, literalCollection.Literals)
discoveredInputNames = append(discoveredInputNames, inputName)
}
}

// build input readers
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literalCollection.Literals, discoveredInputName)
inputReaders = ConstructStaticInputReaders(tCtx.InputReader(), literals, discoveredInputNames)
}

// output reader
Expand Down Expand Up @@ -470,14 +479,19 @@ func ConstructCatalogReaderWorkItems(ctx context.Context, taskReader core.TaskRe

// ConstructStaticInputReaders constructs input readers that comply with the io.InputReader interface but have their
// inputs already populated.
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs []*idlCore.Literal, inputName string) []io.InputReader {
func ConstructStaticInputReaders(inputPaths io.InputFilePaths, inputs [][]*idlCore.Literal, inputNames []string) []io.InputReader {
inputReaders := make([]io.InputReader, 0, len(inputs))
for i := 0; i < len(inputs); i++ {
inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{
Literals: map[string]*idlCore.Literal{
inputName: inputs[i],
},
}))
if len(inputs) == 0 {
return inputReaders
}

for i := 0; i < len(inputs[0]); i++ {
literals := make(map[string]*idlCore.Literal)
for j := 0; j < len(inputNames); j++ {
literals[inputNames[j]] = inputs[j][i]
}

inputReaders = append(inputReaders, NewStaticInputReader(inputPaths, &idlCore.LiteralMap{Literals: literals}))
}

return inputReaders
Expand Down
Loading

0 comments on commit 113947f

Please sign in to comment.