Skip to content

Commit

Permalink
Add tekton loop dsl extension skeleton (kubeflow#799)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tomcli authored Dec 16, 2021
1 parent c4f52e5 commit b16bd88
Show file tree
Hide file tree
Showing 5 changed files with 347 additions and 1 deletion.
70 changes: 69 additions & 1 deletion sdk/python/kfp_tekton/tekton.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Iterable, Union
from typing import List, Iterable, Union, Optional, TypeVar
from kfp.dsl import _pipeline_param, _for_loop
from kfp import dsl
from kfp import components
from kfp.dsl._pipeline_param import ConditionOperator
from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name
from kfp_tekton.compiler._op_to_template import TEKTON_BASH_STEP_IMAGE


CEL_EVAL_IMAGE = "aipipeline/cel-eval:latest"
Expand All @@ -25,6 +27,7 @@
TEKTON_CUSTOM_TASK_IMAGES = [CEL_EVAL_IMAGE]
LOOP_PIPELINE_NAME_LENGTH = 40
LOOP_GROUP_NAME_LENGTH = 16
_Num = TypeVar('_Num', int, float)


def AnySequencer(any: Iterable[Union[dsl.ContainerOp, ConditionOperator]],
Expand Down Expand Up @@ -177,3 +180,68 @@ def CEL_ConditionOp(condition_statement):
ConditionOp = ConditionOp_template(condition_statement)
ConditionOp.add_pod_annotation("valid_container", "false")
return ConditionOp


def Break():
'''A BreakOp template for Break Operation using PipelineLoop
'''
BreakOp_yaml = '''\
name: 'pipelineloop-break-operation'
description: 'Break Operation using PipelineLoop'
implementation:
container:
image: %s
command:
- sh
- -c
- |
echo "$0"
args:
- "break loop"
''' % (TEKTON_BASH_STEP_IMAGE)
BreakOp_template = components.load_component_from_text(BreakOp_yaml)
BreakOp = BreakOp_template()
return BreakOp


class Loop(dsl.ParallelFor):

@classmethod
def sequential(self,
loop_args: _for_loop.ItemList):
return Loop(loop_args=loop_args, parallelism=1)

@classmethod
def from_string(self,
loop_args: Union[str, _pipeline_param.PipelineParam],
separator: Optional[str] = None,
parallelism: Optional[int] = None):
return Loop(loop_args=loop_args, separator=separator, parallelism=parallelism)

@classmethod
def range(self,
a: _Num,
b: _Num,
c: Optional[_Num] = None,
parallelism: Optional[int] = None):
return Loop(start=a, step=b, end=c, parallelism=parallelism)

def __init__(self,
loop_args: Union[_for_loop.ItemList,
_pipeline_param.PipelineParam] = None,
start: _Num = None,
end: _Num = None,
step: _Num = None,
separator: Optional[str] = None,
parallelism: Optional[int] = None):
tekton_params = (start, end, step, separator)
if loop_args and not [x for x in tekton_params if x is not None]:
super().__init__(loop_args=loop_args, parallelism=parallelism)
elif loop_args and separator:
# TODO: implement loop separator DSL extension
pass
elif start and end:
# TODO: implement loop start, end, step DSL extension
pass
else:
raise("loop_args or start/end parameters are missing for 'Loop' class")
7 changes: 7 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ def test_recur_cond_workflow(self):
from .testdata.recur_cond import recur_and_condition
self._test_pipeline_workflow(recur_and_condition, 'recur_cond.yaml')

def test_recur_cond_workflow(self):
"""
Test compiling a loop workflow using tekton loop dsl extension.
"""
from .testdata.tekton_loop_dsl import pipeline
self._test_pipeline_workflow(pipeline, 'tekton_loop_dsl.yaml')

def test_cond_recur_workflow(self):
"""
Test compiling a conditional recursive workflow.
Expand Down
57 changes: 57 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp.dsl as dsl
from kfp import components
from kfp_tekton import tekton

op1_yaml = '''\
name: 'my-in-coop1'
inputs:
- {name: item, type: Integer}
- {name: my_pipe_param, type: Integer}
implementation:
container:
image: library/bash:4.4.23
command: ['sh', '-c']
args:
- |
set -e
echo op1 "$0" "$1"
- {inputValue: item}
- {inputValue: my_pipe_param}
'''


@dsl.pipeline(name='my-pipeline')
def pipeline(my_pipe_param: int = 10):
loop_args = [1, 2]
# The DSL above should produce the same result and the DSL in the bottom
# with dsl.ParallelFor(loop_args, parallelism=1) as item:
# op1_template = components.load_component_from_text(op1_yaml)
# op1 = op1_template(item, my_pipe_param)
# condi_1 = tekton.CEL_ConditionOp(f"{item} == 0").output
# with dsl.Condition(condi_1 == 'true'):
# tekton.Break()
with tekton.Loop.sequential(loop_args) as item:
op1_template = components.load_component_from_text(op1_yaml)
op1 = op1_template(item, my_pipe_param)
condi_1 = tekton.CEL_ConditionOp(f"{item} == 1").output
with dsl.Condition(condi_1 == 'true'):
tekton.Break()


if __name__ == '__main__':
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(pipeline, __file__.replace('.py', '.yaml'))
135 changes: 135 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: my-pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation":
[]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true, "type": "Integer"}], "name": "my-pipeline"}'
spec:
params:
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- name: my_pipe_param
default: '10'
tasks:
- name: my-pipeline-for-loop-2
params:
- name: loop-item-param-1
value: '[1, 2]'
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
spec:
pipelineSpec:
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
tasks:
- name: my-in-coop1
params:
- name: loop-item-param-1
value: $(params.loop-item-param-1)
- name: my_pipe_param
value: $(params.my_pipe_param)
taskSpec:
steps:
- name: main
args:
- |
set -e
echo op1 "$0" "$1"
- $(inputs.params.loop-item-param-1)
- $(inputs.params.my_pipe_param)
command:
- sh
- -c
image: library/bash:4.4.23
params:
- name: loop-item-param-1
type: string
- name: my_pipe_param
type: string
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"args": ["set -e\necho op1 \"$0\" \"$1\"\n", {"inputValue":
"item"}, {"inputValue": "my_pipe_param"}], "command": ["sh",
"-c"], "image": "library/bash:4.4.23"}}, "inputs": [{"name":
"item", "type": "Integer"}, {"name": "my_pipe_param", "type":
"Integer"}], "name": "my-in-coop1"}'
tekton.dev/template: ''
timeout: 525600m
- name: condition-cel
params:
- name: outcome
value: $(params.loop-item-param-1) == 1
taskRef:
name: cel_condition
apiVersion: cel.tekton.dev/v1alpha1
kind: CEL
timeout: 525600m
- name: pipelineloop-break-operation
taskSpec:
steps:
- name: main
args:
- break loop
command:
- sh
- -c
- |
echo "$0"
image: busybox
metadata:
labels:
pipelines.kubeflow.org/pipelinename: ''
pipelines.kubeflow.org/generation: ''
pipelines.kubeflow.org/cache_enabled: "true"
annotations:
pipelines.kubeflow.org/component_spec: '{"description": "Break
Operation using PipelineLoop", "implementation": {"container":
{"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"],
"image": "busybox"}}, "name": "pipelineloop-break-operation"}'
tekton.dev/template: ''
when:
- input: $(tasks.condition-cel.results.outcome)
operator: in
values:
- "true"
timeout: 525600m
parallelism: 1
iterateParam: loop-item-param-1
timeout: 525600m
79 changes: 79 additions & 0 deletions sdk/python/tests/compiler/testdata/tekton_loop_dsl_noninlined.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2021 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: PipelineRun
metadata:
name: my-pipeline
annotations:
tekton.dev/output_artifacts: '{}'
tekton.dev/input_artifacts: '{}'
tekton.dev/artifact_bucket: mlpipeline
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
tekton.dev/artifact_endpoint_scheme: http://
tekton.dev/artifact_items: '{"my-in-coop1": [], "pipelineloop-break-operation":
[]}'
sidecar.istio.io/inject: "false"
pipelines.kubeflow.org/big_data_passing_format: $(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "10", "name": "my_pipe_param",
"optional": true, "type": "Integer"}], "name": "my-pipeline"}'
tekton.dev/resource_templates: '[{"apiVersion": "custom.tekton.dev/v1alpha1",
"kind": "PipelineLoop", "metadata": {"name": "my-pipeline-for-loop-2"}, "spec":
{"iterateParam": "loop-item-param-1", "parallelism": 1, "pipelineSpec": {"params":
[{"name": "loop-item-param-1", "type": "string"}, {"name": "my_pipe_param",
"type": "string"}], "tasks": [{"name": "my-in-coop1", "params": [{"name": "loop-item-param-1",
"value": "$(params.loop-item-param-1)"}, {"name": "my_pipe_param", "value":
"$(params.my_pipe_param)"}], "taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec":
"{\"implementation\": {\"container\": {\"args\": [\"set -e\\necho op1 \\\"$0\\\"
\\\"$1\\\"\\n\", {\"inputValue\": \"item\"}, {\"inputValue\": \"my_pipe_param\"}],
\"command\": [\"sh\", \"-c\"], \"image\": \"library/bash:4.4.23\"}}, \"inputs\":
[{\"name\": \"item\", \"type\": \"Integer\"}, {\"name\": \"my_pipe_param\",
\"type\": \"Integer\"}], \"name\": \"my-in-coop1\"}", "tekton.dev/template":
""}, "labels": {"pipelines.kubeflow.org/cache_enabled": "true", "pipelines.kubeflow.org/generation":
"", "pipelines.kubeflow.org/pipelinename": ""}}, "params": [{"name": "loop-item-param-1",
"type": "string"}, {"name": "my_pipe_param", "type": "string"}], "steps": [{"args":
["set -e\necho op1 \"$0\" \"$1\"\n", "$(inputs.params.loop-item-param-1)", "$(inputs.params.my_pipe_param)"],
"command": ["sh", "-c"], "image": "library/bash:4.4.23", "name": "main"}]},
"timeout": "525600m"}, {"name": "condition-cel", "params": [{"name": "outcome",
"value": "$(params.loop-item-param-1) == 1"}], "taskRef": {"apiVersion": "cel.tekton.dev/v1alpha1",
"kind": "CEL", "name": "cel_condition"}, "timeout": "525600m"}, {"name": "pipelineloop-break-operation",
"taskSpec": {"metadata": {"annotations": {"pipelines.kubeflow.org/component_spec":
"{\"description\": \"Break Operation using PipelineLoop\", \"implementation\":
{\"container\": {\"args\": [\"break loop\"], \"command\": [\"sh\", \"-c\", \"echo
\\\"$0\\\"\\n\"], \"image\": \"busybox\"}}, \"name\": \"pipelineloop-break-operation\"}",
"tekton.dev/template": ""}, "labels": {"pipelines.kubeflow.org/cache_enabled":
"true", "pipelines.kubeflow.org/generation": "", "pipelines.kubeflow.org/pipelinename":
""}}, "steps": [{"args": ["break loop"], "command": ["sh", "-c", "echo \"$0\"\n"],
"image": "busybox", "name": "main"}]}, "timeout": "525600m", "when": [{"input":
"$(tasks.condition-cel.results.outcome)", "operator": "in", "values": ["true"]}]}]}}}]'
spec:
params:
- name: my_pipe_param
value: '10'
pipelineSpec:
params:
- name: my_pipe_param
default: '10'
tasks:
- name: my-pipeline-for-loop-2
taskRef:
apiVersion: custom.tekton.dev/v1alpha1
kind: PipelineLoop
name: my-pipeline-for-loop-2
params:
- name: loop-item-param-1
value: '[1, 2]'
- name: my_pipe_param
value: $(params.my_pipe_param)
timeout: 525600m

0 comments on commit b16bd88

Please sign in to comment.