Skip to content

Commit

Permalink
POC hashing command and args
Browse files Browse the repository at this point in the history
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>
  • Loading branch information
droctothorpe and zazulam committed Aug 6, 2024
1 parent f962ef3 commit 1479067
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
49 changes: 25 additions & 24 deletions backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package argocompiler

import (
"crypto/sha256"
"encoding/hex"
"fmt"
"strings"

Expand Down Expand Up @@ -191,27 +193,27 @@ const (
)

func (c *workflowCompiler) saveComponentSpec(name string, spec *pipelinespec.ComponentSpec) error {
functionName := c.extractFunctionName(name)
functionName := c.hashComponentCommandAndArgs(name)

return c.saveProtoToArguments(argumentsComponents+functionName, spec)
}

// useComponentSpec returns a placeholder we can refer to the component spec
// in argo workflow fields.
func (c *workflowCompiler) useComponentSpec(name string) (string, error) {
functionName := c.extractFunctionName(name)
functionName := c.hashComponentCommandAndArgs(name)

return c.argumentsPlaceholder(argumentsComponents + functionName)
}

func (c *workflowCompiler) saveComponentImpl(name string, msg proto.Message) error {
functionName := c.extractFunctionName(name)
functionName := c.hashComponentCommandAndArgs(name)

return c.saveProtoToArguments(argumentsContainers+functionName, msg)
}

func (c *workflowCompiler) useComponentImpl(name string) (string, error) {
functionName := c.extractFunctionName(name)
functionName := c.hashComponentCommandAndArgs(name)

return c.argumentsPlaceholder(argumentsContainers + functionName)
}
Expand Down Expand Up @@ -266,9 +268,9 @@ func (c *workflowCompiler) argumentsPlaceholder(componentName string) (string, e
return workflowParameter(componentName), nil
}

// extractFunctionName extracts the function name of a component by looking it
// up in the pipeline spec.
func (c *workflowCompiler) extractFunctionName(componentName string) string {
// hashComponentCommandAndArgs combines and hashes command and args fields of a
// given component.
func (c *workflowCompiler) hashComponentCommandAndArgs(componentName string) string {
log.Debug("componentName: ", componentName)
// The root component is a DAG and therefore doesn't have a corresponding
// executor or function name. The final return statement in this function
Expand All @@ -278,35 +280,34 @@ func (c *workflowCompiler) extractFunctionName(componentName string) string {
}
executorLabel := c.spec.Components[componentName].GetExecutorLabel()
log.Debug("executorLabel: ", executorLabel)
// There are more nested conditionals here than we would prefer, but we
// don't want to make any assumptions about the presence of specific fields
// in the IR.
if c.executors != nil {
for executorName, executorValue := range c.executors {
log.Debug("executorName: ", executorName)
if executorName == executorLabel {
args := executorValue.GetContainer().GetArgs()
if args != nil {
if len(args) > 1 {
penultimateArg := args[len(args)-2]
if penultimateArg == "--function_to_execute" {
componentFunctionName := args[len(args)-1]
log.Debug("componentFunctionName: ", componentFunctionName)
return componentFunctionName
}
}
commandList := executorValue.GetContainer().GetCommand()
argList := executorValue.GetContainer().GetArgs()
if commandList == nil && argList == nil {
return componentName
}
stringToHash := strings.Join(commandList, " ")
if argList != nil {
stringToHash += strings.Join(argList, " ")
return hashString(stringToHash)
}
}
}
}

log.Debug("No corresponding executor for component: ", componentName)
// We could theoretically return an error here, but since the only
// consequence of not finding a matching executor is reduced deduplication,
// this doesn't result in application failure and we therefore continue.
return componentName
}

func hashString(s string) string {
h := sha256.New()
h.Write([]byte(s))

return hex.EncodeToString(h.Sum(nil))
}

const (
paramComponent = "component" // component spec
paramTask = "task" // task spec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ spec:
parameters:
- name: kubernetes-comp-comp
value: '{"pvcMount":[{"mountPath":"/data","taskOutputParameter":{"outputParameterKey":"name","producerTask":"createpvc"}}]}'
- name: components-comp
- name: components-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1
value: '{"executorLabel":"exec-comp"}'
- name: implementations-comp
- name: implementations-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1
value: '{"args":["--executor_input","{{$}}","--function_to_execute","comp"],"command":["sh","-c","\nif
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3
-m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
Expand Down Expand Up @@ -197,11 +197,11 @@ spec:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp}}'
value: '{{workflow.parameters.components-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1}}'
- name: task
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp"},"dependentTasks":["createpvc"],"taskInfo":{"name":"comp"}}'
- name: container
value: '{{workflow.parameters.implementations-comp}}'
value: '{{workflow.parameters.implementations-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: kubernetes-config
Expand All @@ -222,11 +222,11 @@ spec:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp}}'
value: '{{workflow.parameters.components-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1}}'
- name: task
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-comp-2"},"dependentTasks":["comp","createpvc"],"taskInfo":{"name":"comp-2"}}'
- name: container
value: '{{workflow.parameters.implementations-comp}}'
value: '{{workflow.parameters.implementations-db3197b81c484742322b041296c85f7998bd2442d4ffa0459f23dc73c57b9ef1}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: kubernetes-config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ spec:
parameters:
- name: kubernetes-comp-hello-world
value: '{"podMetadata":{"annotations":{"experiment_id":"234567","run_id":"123456"},"labels":{"kubeflow.com/common":"test","kubeflow.com/kfp":"pipeline-node"}}}'
- name: components-comp-hello-world
- name: components-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b
value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}'
- name: implementations-comp-hello-world
- name: implementations-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b
value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf
\"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def
hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser
Expand Down Expand Up @@ -191,11 +191,11 @@ spec:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp-hello-world}}'
value: '{{workflow.parameters.components-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b}}'
- name: task
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}'
- name: container
value: '{{workflow.parameters.implementations-comp-hello-world}}'
value: '{{workflow.parameters.implementations-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
- name: kubernetes-config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ metadata:
spec:
arguments:
parameters:
- name: components-comp-hello-world
- name: components-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b
value: '{"executorLabel":"exec-hello-world","inputDefinitions":{"parameters":{"text":{"type":"STRING"}}}}'
- name: implementations-comp-hello-world
- name: implementations-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b
value: '{"args":["--text","{{$.inputs.parameters[''text'']}}"],"command":["sh","-ec","program_path=$(mktemp)\nprintf
\"%s\" \"$0\" \u003e \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n","def
hello_world(text):\n print(text)\n return text\n\nimport argparse\n_parser
Expand Down Expand Up @@ -183,11 +183,11 @@ spec:
- arguments:
parameters:
- name: component
value: '{{workflow.parameters.components-comp-hello-world}}'
value: '{{workflow.parameters.components-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b}}'
- name: task
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-hello-world"},"inputs":{"parameters":{"text":{"componentInputParameter":"text"}}},"taskInfo":{"name":"hello-world"}}'
- name: container
value: '{{workflow.parameters.implementations-comp-hello-world}}'
value: '{{workflow.parameters.implementations-285f834445312aa2162cf75d155ac477a6149126369484809c864b8d1943533b}}'
- name: parent-dag-id
value: '{{inputs.parameters.parent-dag-id}}'
name: hello-world-driver
Expand Down

0 comments on commit 1479067

Please sign in to comment.