Skip to content

Commit

Permalink
Adding workflow name to workflow crd labels (flyteorg#39)
Browse files Browse the repository at this point in the history
* Adding workflow name to workflow crd labels

* node id

* test

* cr feedback

* lint

* adding taskname label

* bogus
  • Loading branch information
surindersinghp authored Dec 6, 2019
1 parent a71a62c commit f487cfe
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
13 changes: 12 additions & 1 deletion pkg/compiler/transformers/k8s/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/compiler/common"
"github.com/lyft/flytepropeller/pkg/compiler/errors"
"github.com/lyft/flytepropeller/pkg/utils"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const ExecutionIDLabel = "execution-id"
const WorkflowIDLabel = "workflow-id"
const WorkflowNameLabel = "workflow-name"

func requiresInputs(w *core.WorkflowTemplate) bool {
if w == nil || w.GetInterface() == nil || w.GetInterface().GetInputs() == nil ||
Expand All @@ -25,6 +26,7 @@ func requiresInputs(w *core.WorkflowTemplate) bool {
return len(w.GetInterface().GetInputs().Variables) > 0
}

// Note: Update WorkflowNameFromID for any change made to WorkflowIDAsString
func WorkflowIDAsString(id *core.Identifier) string {
b := strings.Builder{}
_, err := b.WriteString(id.Project)
Expand Down Expand Up @@ -55,6 +57,14 @@ func WorkflowIDAsString(id *core.Identifier) string {
return b.String()
}

func WorkflowNameFromID(id string) string {
tokens := strings.Split(id, ":")
if len(tokens) != 3 {
return ""
}
return tokens[2]
}

func buildFlyteWorkflowSpec(wf *core.CompiledWorkflow, tasks []*core.CompiledTask, errs errors.CompileErrors) (
spec *v1alpha1.WorkflowSpec) {
var failureN *v1alpha1.NodeSpec
Expand Down Expand Up @@ -171,6 +181,7 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
if err != nil {
errs.Collect(errors.NewWorkflowBuildError(err))
}
obj.ObjectMeta.Labels[WorkflowNameLabel] = utils.SanitizeLabelValue(WorkflowNameFromID(primarySpec.ID))

if obj.Nodes == nil || obj.Connections.DownstreamEdges == nil {
// If we come here, we'd better have an error generated earlier. Otherwise, add one to make sure build fails.
Expand Down
1 change: 1 addition & 0 deletions pkg/compiler/transformers/k8s/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestBuildFlyteWorkflow(t *testing.T) {
},
},
nil, nil, "")
assert.Equal(t, "wf-1", wf.Labels[WorkflowNameLabel])
assert.NoError(t, err)
assert.NotNil(t, wf)
errors.SetConfig(errors.Config{})
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ func TestNodeExecutor_RecursiveNodeHandler_Recurse(t *testing.T) {
mockWf.On("GetExecutionStatus").Return(mockWfStatus)
mockWf.On("GetTask", taskID0).Return(tk, nil)
mockWf.On("GetTask", taskID).Return(tk, nil)
mockWf.On("GetLabels").Return(make(map[string]string))
mockWfStatus.On("GetDataDir").Return(storage.DataReference("x"))
return mockWf, mockN2Status
}
Expand Down
22 changes: 21 additions & 1 deletion pkg/controller/nodes/node_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ import (

"github.com/lyft/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/lyft/flytepropeller/pkg/controller/nodes/handler"
"github.com/lyft/flytepropeller/pkg/utils"
)

const NodeIDLabel = "node-id"
const TaskNameLabel = "task-name"

type execMetadata struct {
v1alpha1.WorkflowMeta
}
Expand All @@ -38,6 +42,7 @@ type execContext struct {
nsm *nodeStateManager
enqueueOwner func() error
w v1alpha1.ExecutableWorkflow
nodeLabels map[string]string
}

func (e execContext) EnqueueOwnerFunc() func() error {
Expand Down Expand Up @@ -96,9 +101,23 @@ func (e execContext) MaxDatasetSizeBytes() int64 {
return e.maxDatasetSizeBytes
}

func (e execContext) GetLabels() map[string]string {
return e.nodeLabels
}

func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.ExecutableWorkflow, node v1alpha1.ExecutableNode, nodeStatus v1alpha1.ExecutableNodeStatus, inputs io.InputReader, maxDatasetSize int64, er events.TaskEventRecorder, tr handler.TaskReader, nsm *nodeStateManager, enqueueOwner func() error) *execContext {
md := execMetadata{WorkflowMeta: w}
nodeLabels := md.GetLabels()
if nodeLabels == nil {
nodeLabels = make(map[string]string)
}
nodeLabels[NodeIDLabel] = utils.SanitizeLabelValue(node.GetID())
if tr != nil && tr.GetTaskID() != nil {
nodeLabels[TaskNameLabel] = utils.SanitizeLabelValue(tr.GetTaskID().Name)
}

return &execContext{
md: execMetadata{WorkflowMeta: w},
md: md,
store: store,
node: node,
nodeStatus: nodeStatus,
Expand All @@ -109,6 +128,7 @@ func newNodeExecContext(_ context.Context, store *storage.DataStore, w v1alpha1.
nsm: nsm,
enqueueOwner: enqueueOwner,
w: w,
nodeLabels: nodeLabels,
}
}

Expand Down
17 changes: 17 additions & 0 deletions pkg/utils/k8s.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package utils

import (
"regexp"
"strings"

"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
Expand All @@ -9,13 +12,16 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation"
)

var NotTheOwnerError = errors.Errorf("FlytePropeller is not the owner")

// ResourceNvidiaGPU is the name of the Nvidia GPU resource.
const ResourceNvidiaGPU = "nvidia.com/gpu"

var invalidDNS1123Characters = regexp.MustCompile("[^-a-z0-9]+")

func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {
envVars := make([]v1.EnvVar, 0, len(env))
for _, kv := range env {
Expand Down Expand Up @@ -94,6 +100,7 @@ func GetWorkflowIDFromOwner(reference *metav1.OwnerReference, namespace string)
}
return "", NotTheOwnerError
}

func GetProtoTime(t *metav1.Time) *timestamp.Timestamp {
if t != nil {
pTime, err := ptypes.TimestampProto(t.Time)
Expand All @@ -103,3 +110,13 @@ func GetProtoTime(t *metav1.Time) *timestamp.Timestamp {
}
return ptypes.TimestampNow()
}

// SanitizeLabelValue ensures that the label value is a valid DNS-1123 string
func SanitizeLabelValue(name string) string {
name = strings.ToLower(name)
name = invalidDNS1123Characters.ReplaceAllString(name, "-")
if len(name) > validation.DNS1123LabelMaxLength {
name = name[0:validation.DNS1123LabelMaxLength]
}
return strings.Trim(name, "-")
}

0 comments on commit f487cfe

Please sign in to comment.