Skip to content

Commit

Permalink
Adding workflow name to workflow crd labels (flyteorg#39)
Browse files Browse the repository at this point in the history
* Adding workflow name to workflow crd labels

* node id

* test

* cr feedback

* lint

* adding taskname label

* bogus
  • Loading branch information
surindersinghp authored Dec 6, 2019
1 parent 8bf52f4 commit 0626e81
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
13 changes: 12 additions & 1 deletion flytepropeller/pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/compiler/common"
"github.com/lyft/flytepropeller/pkg/compiler/errors"
"github.com/lyft/flytepropeller/pkg/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const ExecutionIDLabel = "execution-id"
const WorkflowIDLabel = "workflow-id"
const WorkflowNameLabel = "workflow-name"

func requiresInputs(w *core.WorkflowTemplate) bool {
if w == nil || w.GetInterface() == nil || w.GetInterface().GetInputs() == nil ||
Expand All @@ -25,6 +26,7 @@ func requiresInputs(w *core.WorkflowTemplate) bool {
return len(w.GetInterface().GetInputs().Variables) > 0
}

// Note: Update WorkflowNameFromID for any change made to WorkflowIDAsString
func WorkflowIDAsString(id *core.Identifier) string {
b := strings.Builder{}
_, err := b.WriteString(id.Project)
Expand Down Expand Up @@ -55,6 +57,14 @@ func WorkflowIDAsString(id *core.Identifier) string {
return b.String()
}

func WorkflowNameFromID(id string) string {
tokens := strings.Split(id, ":")
if len(tokens) != 3 {
return ""
}
return tokens[2]
}

func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) (
spec *v1alpha1.WorkflowSpec) {
var failureN *v1alpha1.NodeSpec
Expand Down Expand Up @@ -171,6 +181,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
}
obj.ObjectMeta.Labels[WorkflowNameLabel] = utils.SanitizeLabelValue(WorkflowNameFromID(primarySpec.ID))

if obj.Nodes == nil || obj.Connections.DownstreamEdges == nil {
// If we come here, we'd better have an error generated earlier. Otherwise, add one to make sure build fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestBuildFlyteWorkflow(t *testing.T) {
},
},
nil, nil, "")
assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel])
assert.NoError(t, err)
assert.NotNil(t, wf)
errors.SetConfig(errors.Config{})
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf.On("GetExecutionStatus").Return(mockWfStatus)
mockWf.On("GetTask", taskID0).Return(tk, nil)
mockWf.On("GetTask", taskID).Return(tk, nil)
mockWf.On("GetLabels").Return(make(map[string]string))
mockWfStatus.On("GetDataDir").Return(storage.DataReference("x"))
return mockWf, mockN2Status
}
Expand Down
22 changes: 21 additions & 1 deletion flytepropeller/pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/utils"
)

const NodeIDLabel = "node-id"
const TaskNameLabel = "task-name"

type execMetadata struct {
v1alpha1.WorkflowMeta
}
Expand All @@ -38,6 +42,7 @@ type execContext struct {
nsm *nodeStateManager
enqueueOwner func() error
w v1alpha1.ExecutableWorkflow
nodeLabels map[string]string
}

func (e execContext) EnqueueOwnerFunc() func() error {
Expand Down Expand Up @@ -96,9 +101,23 @@ func (e execContext) MaxDatasetSizeBytes() int64 {
return e.maxDatasetSizeBytes
}

func (e execContext) GetLabels() map[string]string {
return e.nodeLabels
}

func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error) *execContext {
md := execMetadata{WorkflowMeta: w}
nodeLabels := md.GetLabels()
if nodeLabels == nil {
nodeLabels = make(map[string]string)
}
nodeLabels[NodeIDLabel] = utils.SanitizeLabelValue(node.GetID())
if tr != nil && tr.GetTaskID() != nil {
nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name)
}

return &execContext{
md: execMetadata{WorkflowMeta: w},
md: md,
store: store,
node: node,
nodeStatus: nodeStatus,
Expand All @@ -109,6 +128,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.
nsm: nsm,
enqueueOwner: enqueueOwner,
w: w,
nodeLabels: nodeLabels,
}
}

Expand Down
17 changes: 17 additions & 0 deletions flytepropeller/pkg/utils/k8s.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package utils

import (
"regexp"
"strings"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -9,13 +12,16 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
)

var NotTheOwnerError = errors.Errorf("FlytePropeller is not the owner")

// ResourceNvidiaGPU is the name of the Nvidia GPU resource.
const ResourceNvidiaGPU = "nvidia.com/gpu"

var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+")

func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {
envVars := make([]v1.EnvVar, 0, len(env))
for _, kv := range env {
Expand Down Expand Up @@ -94,6 +100,7 @@ func GetWorkflowIDFromOwner(reference *metav1.OwnerReference, namespace string)
}
return "", NotTheOwnerError
}

func GetProtoTime(t *metav1.Time) *timestamp.Timestamp {
if t != nil {
pTime, err := ptypes.TimestampProto(t.Time)
Expand All @@ -103,3 +110,13 @@ func GetProtoTime(t *metav1.Time) *timestamp.Timestamp {
}
return ptypes.TimestampNow()
}

// SanitizeLabelValue ensures that the label value is a valid DNS-1123 string
func SanitizeLabelValue(name string) string {
name = strings.ToLower(name)
name = invalidDNS1123Characters.ReplaceAllString(name, "-")
if len(name) > validation.DNS1123LabelMaxLength {
name = name[0:validation.DNS1123LabelMaxLength]
}
return strings.Trim(name, "-")
}

0 comments on commit 0626e81

Please sign in to comment.