Skip to content

Commit

Permalink
feat(backend): move comp logic to workflow params (#10979)
Browse files Browse the repository at this point in the history
* feat(backend): move comp logic to workflow params

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Signed-off-by: zazulam <[email protected]>

* address pr comments

Signed-off-by: zazulam <[email protected]>

* Use function name instead of base name and address edge cases

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Improve logic and update tests

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* POC hashing command and args

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Add comments to clarify the logic

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Hash entire PipelineContainerSpec

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

---------

Signed-off-by: zazulam <[email protected]>
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>
  • Loading branch information
6 people authored and KevinGrantLee committed Sep 17, 2024
1 parent 513d46b commit e55c330
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 136 deletions.
124 changes: 98 additions & 26 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@
package argocompiler

import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"strings"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
k8score "k8s.io/api/core/v1"
Expand Down Expand Up @@ -63,7 +67,7 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
if err != nil {
return nil, err
}
// fill root component default paramters to PipelineJob
// fill root component default parameters to PipelineJob
specParams := spec.GetRoot().GetInputDefinitions().GetParameters()
for name, param := range specParams {
_, ok := job.RuntimeConfig.ParameterValues[name]
Expand Down Expand Up @@ -108,6 +112,9 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S
"pipelines.kubeflow.org/v2_component": "true",
},
},
Arguments: wfapi.Arguments{
Parameters: []wfapi.Parameter{},
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
},
Expand Down Expand Up @@ -180,69 +187,134 @@ func (c *workflowCompiler) templateName(componentName string) string {
return componentName
}

// WIP: store component spec, task spec and executor spec in annotations

const (
annotationComponents = "pipelines.kubeflow.org/components-"
annotationContainers = "pipelines.kubeflow.org/implementations-"
annotationKubernetesSpec = "pipelines.kubeflow.org/kubernetes-"
argumentsComponents = "components-"
argumentsContainers = "implementations-"
argumentsKubernetesSpec = "kubernetes-"
)

func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
return c.saveProtoToAnnotation(annotationComponents+name, spec)
hashedComponent := c.hashComponentContainer(name)

return c.saveProtoToArguments(argumentsComponents+hashedComponent, spec)
}

// useComponentSpec returns a placeholder we can refer to the component spec
// in argo workflow fields.
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
return c.annotationPlaceholder(annotationComponents + name)
hashedComponent := c.hashComponentContainer(name)

return c.argumentsPlaceholder(argumentsComponents + hashedComponent)
}

func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
return c.saveProtoToAnnotation(annotationContainers+name, msg)
hashedComponent := c.hashComponentContainer(name)

return c.saveProtoToArguments(argumentsContainers+hashedComponent, msg)
}

func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationContainers + name)
hashedComponent := c.hashComponentContainer(name)

return c.argumentsPlaceholder(argumentsContainers + hashedComponent)
}

func (c *workflowCompiler) saveKubernetesSpec(name string, spec *structpb.Struct) error {
return c.saveProtoToAnnotation(annotationKubernetesSpec+name, spec)
return c.saveProtoToArguments(argumentsKubernetesSpec+name, spec)
}

func (c *workflowCompiler) useKubernetesImpl(name string) (string, error) {
return c.annotationPlaceholder(annotationKubernetesSpec + name)
return c.argumentsPlaceholder(argumentsKubernetesSpec + name)
}

// TODO(Bobgy): sanitize component name
func (c *workflowCompiler) saveProtoToAnnotation(name string, msg proto.Message) error {
// saveProtoToArguments saves a proto message to the workflow arguments. The
// message is serialized to JSON and stored in the workflow arguments and then
// referenced by the workflow templates using AWF templating syntax. The reason
// for storing it in the workflow arguments is because there is a 1-many
// relationship between components and tasks that reference them. The workflow
// arguments allow us to deduplicate the component logic (implementation & spec
// in IR), significantly reducing the size of the argo workflow manifest.
func (c *workflowCompiler) saveProtoToArguments(componentName string, msg proto.Message) error {
if c == nil {
return fmt.Errorf("compiler is nil")
}
if c.wf.Annotations == nil {
c.wf.Annotations = make(map[string]string)
if c.wf.Spec.Arguments.Parameters == nil {
c.wf.Spec.Arguments = wfapi.Arguments{Parameters: []wfapi.Parameter{}}
}
if _, alreadyExists := c.wf.Annotations[name]; alreadyExists {
return fmt.Errorf("annotation %q already exists", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) != nil {
return nil
}
json, err := stablyMarshalJSON(msg)
if err != nil {
return fmt.Errorf("saving component spec of %q to annotations: %w", name, err)
return fmt.Errorf("saving component spec of %q to arguments: %w", componentName, err)
}
// TODO(Bobgy): verify name adheres to Kubernetes annotation restrictions: https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set
c.wf.Annotations[name] = json
c.wf.Spec.Arguments.Parameters = append(c.wf.Spec.Arguments.Parameters, wfapi.Parameter{
Name: componentName,
Value: wfapi.AnyStringPtr(json),
})
return nil
}

func (c *workflowCompiler) annotationPlaceholder(name string) (string, error) {
// argumentsPlaceholder checks for the unique component name within the workflow
// arguments and returns a template tag that references the component in the
// workflow arguments.
func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, error) {
if c == nil {
return "", fmt.Errorf("compiler is nil")
}
if _, exists := c.wf.Annotations[name]; !exists {
return "", fmt.Errorf("using component spec: failed to find annotation %q", name)
if c.wf.Spec.Arguments.GetParameterByName(componentName) == nil {
return "", fmt.Errorf("using component spec: failed to find workflow parameter %q", componentName)
}

return workflowParameter(componentName), nil
}

// hashComponentContainer serializes and hashes the container field of a given
// component.
func (c *workflowCompiler) hashComponentContainer(componentName string) string {
log.Debug("componentName: ", componentName)
// Return early for root component since it has no command and args.
if componentName == "root" {
return componentName
}
// Reference: https://argoproj.github.io/argo-workflows/variables/
return fmt.Sprintf("{{workflow.annotations.%s}}", name), nil
if c.executors != nil { // Don't bother if there are no executors in the pipeline spec.
// Look up the executorLabel for the component in question.
executorLabel := c.spec.Components[componentName].GetExecutorLabel()
log.Debug("executorLabel: ", executorLabel)
// Iterate through the list of executors.
for executorName, executorValue := range c.executors {
log.Debug("executorName: ", executorName)
// If one of them matches the executorLabel we extracted earlier...
if executorName == executorLabel {
// Get the corresponding container.
container := executorValue.GetContainer()
if container != nil {
containerHash, err := hashValue(container)
if err != nil {
// Do not bubble up since this is not a breaking error
// and we can just return the componentName in full.
log.Debug("Error hashing container: ", err)
}

return containerHash
}
}
}
}

return componentName
}

// hashValue serializes and hashes a provided value.
func hashValue(value interface{}) (string, error) {
bytes, err := json.Marshal(value)
if err != nil {
return "", err
}
h := sha256.New()
h.Write([]byte(bytes))

return hex.EncodeToString(h.Sum(nil)), nil
}

const (
Expand Down
6 changes: 4 additions & 2 deletions backend/src/v2/compiler/argocompiler/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,11 @@ func (c *workflowCompiler) addContainerExecutorTemplate(refName string) string {
},
}
// Update pod metadata if it defined in the Kubernetes Spec
if kubernetesConfigString, ok := c.wf.Annotations[annotationKubernetesSpec+refName]; ok {
kubernetesConfigParam := c.wf.Spec.Arguments.GetParameterByName(argumentsKubernetesSpec + refName)

if kubernetesConfigParam != nil {
k8sExecCfg := &kubernetesplatform.KubernetesExecutorConfig{}
if err := jsonpb.UnmarshalString(kubernetesConfigString, k8sExecCfg); err == nil {
if err := jsonpb.UnmarshalString(string(*kubernetesConfigParam.Value), k8sExecCfg); err == nil {
extendPodMetadata(&executor.Metadata, k8sExecCfg)
}
}
Expand Down
Loading

0 comments on commit e55c330

Please sign in to comment.