From 8988620a18a4fd598c0d8cfa09a5df75c5d9aa32 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Tue, 21 Apr 2020 13:23:24 -0700 Subject: [PATCH 1/3] support input artifacts --- sdk/README.md | 33 +- sdk/python/kfp_tekton/compiler/__init__.py | 1 + .../kfp_tekton/compiler/_op_to_template.py | 360 ++++++++++---- sdk/python/kfp_tekton/compiler/compiler.py | 257 ++++++++-- sdk/python/kfp_tekton/compiler/main.py | 13 +- sdk/python/tests/README.md | 39 +- sdk/python/tests/compiler/compiler_tests.py | 55 ++- .../compiler/testdata/artifact_location.py | 53 ++ .../compiler/testdata/artifact_location.yaml | 100 ++++ .../tests/compiler/testdata/condition.py | 41 ++ .../tests/compiler/testdata/condition.yaml | 89 ++++ .../compiler/testdata/imagepullsecrets.py | 61 +++ .../compiler/testdata/imagepullsecrets.yaml | 72 +++ .../testdata/input_artifact_raw_value.py | 70 +++ .../testdata/input_artifact_raw_value.txt | 1 + .../testdata/input_artifact_raw_value.yaml | 142 ++++++ sdk/python/tests/compiler/testdata/katib.py | 129 +++++ sdk/python/tests/compiler/testdata/katib.yaml | 141 ++++++ .../compiler/testdata/resourceop_basic.py | 67 +++ .../compiler/testdata/resourceop_basic.yaml | 89 ++++ .../tests/compiler/testdata/volume_op.py | 42 ++ .../tests/compiler/testdata/volume_op.yaml | 128 +++++ .../compiler/testdata/volume_snapshot_op.py | 96 ++++ .../compiler/testdata/volume_snapshot_op.yaml | 456 ++++++++++++++++++ sdk/python/tests/config.yaml | 13 + sdk/python/tests/test_kfp_samples.sh | 49 +- sdk/python/tests/test_kfp_samples_report.txt | 30 +- sdk/python/tests/test_util.py | 102 ++++ 28 files changed, 2545 insertions(+), 184 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/artifact_location.py create mode 100644 sdk/python/tests/compiler/testdata/artifact_location.yaml create mode 100644 sdk/python/tests/compiler/testdata/condition.py create mode 100644 sdk/python/tests/compiler/testdata/condition.yaml create mode 100644 sdk/python/tests/compiler/testdata/imagepullsecrets.py create mode 100644 sdk/python/tests/compiler/testdata/imagepullsecrets.yaml create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.py create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml create mode 100644 sdk/python/tests/compiler/testdata/katib.py create mode 100644 sdk/python/tests/compiler/testdata/katib.yaml create mode 100644 sdk/python/tests/compiler/testdata/resourceop_basic.py create mode 100644 sdk/python/tests/compiler/testdata/resourceop_basic.yaml create mode 100644 sdk/python/tests/compiler/testdata/volume_op.py create mode 100644 sdk/python/tests/compiler/testdata/volume_op.yaml create mode 100644 sdk/python/tests/compiler/testdata/volume_snapshot_op.py create mode 100644 sdk/python/tests/compiler/testdata/volume_snapshot_op.yaml create mode 100644 sdk/python/tests/config.yaml create mode 100644 sdk/python/tests/test_util.py diff --git a/sdk/README.md b/sdk/README.md index 7aebbddafe..cb9bad0ba4 100644 --- a/sdk/README.md +++ b/sdk/README.md @@ -82,7 +82,9 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas Please [refer to the instructions here](./python/tests/README.md) as you work on a PR test sample Kubeflow Pipelines in their test data folder to ensure your PR is improving the number of successful samples -## Compile Kubeflow Pipelines as Tekton pipelineRun +## Experimental features + +### 1. Compile Kubeflow Pipelines as Tekton pipelineRun By default, Tekton pipelineRun is generated by the `tkn` CLI so that users can interactively change their pipeline parameters during each execution. However, `tkn` CLI is lagging several important features when generating pipelineRun. Therefore, we added support for generating pipelineRun using `dsl-compile-tekton` with all the latest kfp-tekton compiler features. The comparison between Tekton pipeline and Argo workflow is described in our [design docs](https://docs.google.com/document/d/1oXOdiItI4GbEe_qzyBmMAqfLBjfYX1nM94WHY3EPa94/edit#heading=h.f38y0bqkxo87). @@ -96,5 +98,32 @@ As of today, the below pipelineRun features are available within `dsl-compile-te To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--generate-pipelinerun` as part of your `dsl-compile-tekton`commands. e.g. - `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/tolerations.py --output pipeline.yaml --generate-pipelinerun` +### 2. Compile Kubeflow Pipelines with artifact enabled + +Prerequisite: Install [Kubeflow Pipeline](https://www.kubeflow.org/docs/pipelines/installation/). + +By default, artifacts are disabled because it's depended on Kubeflow Pipeline's minio setup. When artifacts are enabled, all the output parameters are also treated as artifacts and persist to the default object storage. Enabling artifacts also allow files to be downloaded or stored as artifact inputs/outputs. Since artifacts are depending on the Kubeflow Pipeline's setup by default, the generated Tekton pipeline must be deployed to the same namespace as Kubeflow Pipeline. + +To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--enable-artifacts` as part of your `dsl-compile-tekton` commands. Then, run the pipeline on the same namespace as Kubeflow pipeline using the `-n` flag. e.g. +```shell +dsl-compile-tekton --py sdk/python/tests/compiler/testdata/artifact_location.py --output pipeline.yaml --enable-artifacts +kubectl apply -f pipeline.yaml -n kubeflow +tkn pipeline start custom-artifact-location-pipeline --showlog -n kubeflow +``` + +You should see the below outputs saying the artifacts are stored in the object storage you specify. +``` +? Value for param `secret_name` of type `string`? (Default is `mlpipeline-minio-artifact`) mlpipeline-minio-artifact +? Value for param `tag` of type `string`? (Default is `1.31.0`) 1.31.0 +? Value for param `namespace` of type `string`? (Default is `kubeflow`) kubeflow +? Value for param `bucket` of type `string`? (Default is `mlpipeline`) mlpipeline +Pipelinerun started: custom-artifact-location-pipeline-run-b87bq +Waiting for logs to be available... + +[generate-output : copy-artifacts] Added `storage` successfully. +[generate-output : copy-artifacts] `/tekton/results/output` -> `storage/mlpipeline/runs/custom-artifact-location-pipeline-run-b87bq/custom-artifact-location-pipeline-run-b87bq-generate-outp-7rnxv/output.txt` +[generate-output : copy-artifacts] Total: 0 B, Transferred: 6 B, Speed: 504 B/s +``` + ## Troubleshooting -- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline. \ No newline at end of file +- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline. diff --git a/sdk/python/kfp_tekton/compiler/__init__.py b/sdk/python/kfp_tekton/compiler/__init__.py index ccf61d3576..2d8bebc1eb 100644 --- a/sdk/python/kfp_tekton/compiler/__init__.py +++ b/sdk/python/kfp_tekton/compiler/__init__.py @@ -43,6 +43,7 @@ def monkey_patch(): KFPCompiler._create_and_write_workflow = TektonCompiler._create_and_write_workflow KFPCompiler._create_pipeline_workflow = TektonCompiler._create_pipeline_workflow KFPCompiler._create_workflow = TektonCompiler._create_workflow + KFPCompiler._group_to_dag_template = TektonCompiler._group_to_dag_template KFPCompiler._write_workflow = TektonCompiler._write_workflow diff --git a/sdk/python/kfp_tekton/compiler/_op_to_template.py b/sdk/python/kfp_tekton/compiler/_op_to_template.py index cc5812de6f..f861e01bae 100644 --- a/sdk/python/kfp_tekton/compiler/_op_to_template.py +++ b/sdk/python/kfp_tekton/compiler/_op_to_template.py @@ -16,8 +16,14 @@ from kfp.compiler._k8s_helper import convert_k8s_obj_to_json from kfp.compiler._op_to_template import _process_obj, _inputs_to_json, _outputs_to_json from kfp import dsl -from kfp.dsl import ArtifactLocation from kfp.dsl._container_op import BaseOp +from kfp.dsl import ArtifactLocation +from urllib.parse import urlparse +import textwrap +import yaml +import re +import os +import copy from .. import tekton_api_version @@ -64,7 +70,7 @@ def _process_base_ops(op: BaseOp): return op -def _op_to_template(op: BaseOp): +def _op_to_template(op: BaseOp, enable_artifacts=False): """Generate template given an operator inherited from BaseOp.""" # NOTE in-place update to BaseOp @@ -78,15 +84,18 @@ def _op_to_template(op: BaseOp): # This should have been as easy as output_artifact_paths.update(op.file_outputs), but the _outputs_to_json function changes the output names and we must do the same here, so that the names are the same output_artifact_paths.update(sorted(((param.full_name, processed_op.file_outputs[param.name]) for param in processed_op.outputs.values()), key=lambda x: x[0])) - output_artifacts = [ - # convert_k8s_obj_to_json( - # ArtifactLocation.create_artifact_for_s3( - # op.artifact_location, - # name=name, - # path=path, - # key='runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz')) - # for name, path in output_artifact_paths.items() - ] + if enable_artifacts: + output_artifacts = [ + convert_k8s_obj_to_json( + ArtifactLocation.create_artifact_for_s3( + op.artifact_location, + name=name, + path=path, + key='runs/$PIPELINERUN/$PODNAME/' + name)) + for name, path in output_artifact_paths.items() + ] + else: + output_artifacts = [] # workflow template container = convert_k8s_obj_to_json( @@ -106,21 +115,98 @@ def _op_to_template(op: BaseOp): } elif isinstance(op, dsl.ResourceOp): - # # no output artifacts - # output_artifacts = [] - # - # # workflow template - # processed_op.resource["manifest"] = yaml.dump( - # convert_k8s_obj_to_json(processed_op.k8s_resource), - # default_flow_style=False - # ) - # template = { - # 'name': processed_op.name, - # 'resource': convert_k8s_obj_to_json( - # processed_op.resource - # ) - # } - raise NotImplementedError("dsl.ResourceOp is not yet implemented") + # no output artifacts + output_artifacts = [] + + # Flatten manifest because it needs to replace Argo variables + manifest = yaml.dump(convert_k8s_obj_to_json(processed_op.k8s_resource), default_flow_style=False) + argo_var = False + if manifest.find('{{workflow.name}}') != -1: + # Kubernetes Pod arguments only take $() as environment variables + manifest = manifest.replace('{{workflow.name}}', "$(PIPELINERUN)") + # Remove yaml quote in order to read bash variables + manifest = re.sub('name: \'([^\']+)\'', 'name: \g<1>', manifest) + argo_var = True + + # task template + template = { + 'apiVersion': tekton_api_version, + 'kind': 'Task', + 'metadata': {'name': processed_op.name}, + 'spec': { + "params": [ + { + "description": "Action on the resource", + "name": "action", + "type": "string" + }, + { + "default": "strategic", + "description": "Merge strategy when using action patch", + "name": "merge-strategy", + "type": "string" + }, + { + "default": "", + "description": "An express to retrieval data from resource.", + "name": "output", + "type": "string" + }, + { + "default": "", + "description": "A label selector express to decide if the action on resource is success.", + "name": "success-condition", + "type": "string" + }, + { + "default": "", + "description": "A label selector express to decide if the action on resource is failure.", + "name": "failure-condition", + "type": "string" + }, + { + "default": "index.docker.io/aipipeline/kubeclient:v0.0.2", # Todo: The image need to be replaced, once there are official images from tekton + "description": "Kubectl wrapper image", + "name": "image", + "type": "string" + }, + { + "default": "false", + "description": "Enable set owner reference for created resource.", + "name": "set-ownerreference", + "type": "string" + } + ], + 'steps': [ + { + "args": [ + "--action=$(params.action)", + "--merge-strategy=$(params.merge-strategy)", + "--manifest=%s" % manifest, + "--output=$(params.output)", + "--success-condition=$(params.success-condition)", + "--failure-condition=$(params.failure-condition)", + "--set-ownerreference=$(params.set-ownerreference)" + ], + "image": "$(params.image)", + "name": processed_op.name, + "resources": {} + } + ] + } + } + + # Inject Argo variable replacement as env variables. + if argo_var: + template['spec']['steps'][0]['env'] = [ + {'name': 'PIPELINERUN', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.labels['tekton.dev/pipelineRun']"}}} + ] + + # Add results if exist. + if op.attribute_outputs.items(): + template['spec']['results'] = [] + for output_item in sorted(list(op.attribute_outputs.items()), key=lambda x: x[0]): + template['spec']['results'].append({'name': output_item[0], 'description': output_item[1]}) # initContainers if processed_op.init_containers: @@ -128,84 +214,186 @@ def _op_to_template(op: BaseOp): steps.extend(template['spec']['steps']) template['spec']['steps'] = steps + # initial base_step and volume setup + base_step = { + 'image': 'busybox', + 'name': 'copy-results', + 'script': '#!/bin/sh\nset -exo pipefail\n' + } + volume_mount_step_template = [] + volume_template = [] + mounted_param_paths = [] + replaced_param_list = [] + # inputs input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None inputs = _inputs_to_json(processed_op.inputs, input_artifact_paths, artifact_arguments) if 'parameters' in inputs: - template['spec']['params'] = inputs['parameters'] - elif 'artifacts' in inputs: - raise NotImplementedError("input artifacts are not yet implemented") + if isinstance(processed_op, dsl.ContainerOp): + template['spec']['params'] = inputs['parameters'] + elif isinstance(op, dsl.ResourceOp): + template['spec']['params'].extend(inputs['parameters']) + if 'artifacts' in inputs: + # The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input. + # Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation. + copy_inputs_step = copy.deepcopy(base_step) + copy_inputs_step['name'] = 'copy-inputs' + for artifact in inputs['artifacts']: + if 'raw' in artifact: + copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (artifact['raw']['data'], artifact['path']) + mountPath = artifact['path'].rsplit("/", 1)[0] + if mountPath not in mounted_param_paths: + volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]}) + volume_template.append({'name': artifact['name'], 'emptyDir': {}}) + mounted_param_paths.append(mountPath) + copy_inputs_step['script'] = literal_str(copy_inputs_step['script']) + with_inputs_step = [copy_inputs_step] + with_inputs_step.extend(template['spec']['steps']) + template['spec']['steps'] = with_inputs_step + if volume_mount_step_template: + template['spec']['stepTemplate'] = {} + template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template + template['spec']['volumes'] = volume_template # outputs if isinstance(op, dsl.ContainerOp): + op_outputs = processed_op.outputs param_outputs = processed_op.file_outputs elif isinstance(op, dsl.ResourceOp): - param_outputs = processed_op.attribute_outputs - outputs_dict = _outputs_to_json(op, processed_op.outputs, param_outputs, output_artifacts) + op_outputs = {} + param_outputs = {} + outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts) if outputs_dict: - """ - Since Tekton results need to be under /tekton/results. If file output paths cannot be - configured to /tekton/results, we need to create the below copy step for moving - file outputs to the Tekton destination. BusyBox is recommended to be used on - small tasks because it's relatively lightweight and small compared to the ubuntu and - bash images. - - - image: busybox - name: copy-results - script: | - #!/bin/sh - set -exo pipefail - cp $LOCALPATH $(results.data.path); - """ - template['spec']['results'] = [] - copy_results_step = { - 'image': 'busybox', - 'name': 'copy-results', - 'script': '#!/bin/sh\nset -exo pipefail\n' - } +<<<<<<< HEAD +======= volume_mount_step_template = [] volume_template = [] - mounted_paths = [] - for name, path in processed_op.file_outputs.items(): - name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore - template['spec']['results'].append({ - 'name': name, - 'description': path - }) - # replace all occurrences of the output file path with the Tekton output parameter expression - need_copy_step = True - for s in template['spec']['steps']: - if 'command' in s: - commands = [] - for c in s['command']: - if path in c: - c = c.replace(path, '$(results.%s.path)' % name) - need_copy_step = False - commands.append(c) - s['command'] = commands - if 'args' in s: - args = [] - for a in s['args']: - if path in a: - a = a.replace(path, '$(results.%s.path)' % name) - need_copy_step = False - args.append(a) - s['args'] = args - # If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path - if need_copy_step: - copy_results_step['script'] = copy_results_step['script'] + 'cp ' + path + ' $(results.%s.path);' % name + '\n' - mountPath = path.rsplit("/", 1)[0] - if mountPath not in mounted_paths: - volume_mount_step_template.append({'name': name, 'mountPath': path.rsplit("/", 1)[0]}) - volume_template.append({'name': name, 'emptyDir': {}}) - mounted_paths.append(mountPath) - if mounted_paths: + mounted_param_paths = [] + replaced_param_list = [] +>>>>>>> upstream/master + if outputs_dict.get('parameters'): + """ + Since Tekton results need to be under /tekton/results. If file output paths cannot be + configured to /tekton/results, we need to create the below copy step for moving + file outputs to the Tekton destination. BusyBox is recommended to be used on + small tasks because it's relatively lightweight and small compared to the ubuntu and + bash images. + + - image: busybox + name: copy-results + script: | + #!/bin/sh + set -exo pipefail + cp $LOCALPATH $(results.data.path); + """ + template['spec']['results'] = [] +<<<<<<< HEAD + copy_results_step = copy.deepcopy(base_step) +======= + copy_results_step = { + 'image': 'busybox', + 'name': 'copy-results', + 'script': '#!/bin/sh\nset -exo pipefail\n' + } +>>>>>>> upstream/master + for name, path in processed_op.file_outputs.items(): + name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore + template['spec']['results'].append({ + 'name': name, + 'description': path + }) + # replace all occurrences of the output file path with the Tekton output parameter expression + need_copy_step = True + for s in template['spec']['steps']: + if 'command' in s: + commands = [] + for c in s['command']: + if path in c: + c = c.replace(path, '$(results.%s.path)' % name) + need_copy_step = False + commands.append(c) + s['command'] = commands + if 'args' in s: + args = [] + for a in s['args']: + if path in a: + a = a.replace(path, '$(results.%s.path)' % name) + need_copy_step = False + args.append(a) + s['args'] = args + # If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path + if need_copy_step: + copy_results_step['script'] = copy_results_step['script'] + 'cp ' + path + ' $(results.%s.path);' % name + '\n' + mountPath = path.rsplit("/", 1)[0] + if mountPath not in mounted_param_paths: + volume_mount_step_template.append({'name': name, 'mountPath': path.rsplit("/", 1)[0]}) + volume_template.append({'name': name, 'emptyDir': {}}) + mounted_param_paths.append(mountPath) + # Record what artifacts are moved to result parameters. + parameter_name = (processed_op.name + '-' + name).replace(' ', '-').replace('_', '-') + replaced_param_list.append(parameter_name) + copy_artifacts_step = {} + if outputs_dict.get('artifacts'): + """ + For storing artifacts, we will be using the minio/mc image because we need to upload artifacts to any type of + object storage and endpoint. The minio/mc is the best image suited for this task because the default KFP + is using minio and it also works well with other s3/gcs type of storage. + + - image: minio/mc + name: copy-artifacts + script: | + #!/usr/bin/env sh + mc config host add storage http://minio-service.$NAMESPACE:9000 $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + mc cp /tmp/file.txt storage/$(inputs.params.bucket)/runs/$PIPELINERUN/$PODNAME/file.txt + """ + # TODO: Pull default values from KFP configmap when integrated with KFP. + storage_location = outputs_dict['artifacts'][0].get('s3', {}) + insecure = storage_location.get("insecure", True) + endpoint = storage_location.get("endpoint", "minio-service.$NAMESPACE:9000") + # We want to use the insecure flag to figure out whether to use http or https scheme + endpoint = re.sub(r"https?://", "", endpoint) + endpoint = 'http://' + endpoint if insecure else 'https://' + endpoint + access_key = storage_location.get("accessKeySecret", {"name": "mlpipeline-minio-artifact", "key": "accesskey"}) + secret_access_key = storage_location.get("secretKeySecret", {"name": "mlpipeline-minio-artifact", "key": "secretkey"}) + bucket = storage_location.get("bucket", "mlpipeline") + copy_artifacts_step = { + 'image': 'minio/mc', + 'name': 'copy-artifacts', + 'script': textwrap.dedent('''\ + #!/usr/bin/env sh + mc config host add storage %s $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + ''' % (endpoint)), + 'env': [ + {'name': 'PIPELINERUN', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.labels['tekton.dev/pipelineRun']"}}}, + {'name': 'PODNAME', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.name"}}}, + {'name': 'NAMESPACE', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.namespace"}}}, + {'name': 'AWS_ACCESS_KEY_ID', 'valueFrom': {'secretKeyRef': {'name': access_key['name'], 'key': access_key['key']}}}, + {'name': 'AWS_SECRET_ACCESS_KEY', 'valueFrom': {'secretKeyRef': {'name': secret_access_key['name'], 'key': secret_access_key['key']}}} + ] + } + mounted_artifact_paths = [] + for artifact in outputs_dict['artifacts']: + if artifact['name'] in replaced_param_list: + copy_artifacts_step['script'] = copy_artifacts_step['script'] + \ + 'mc cp $(results.%s.path) storage/%s/runs/$PIPELINERUN/$PODNAME/%s' % (name, bucket, artifact['path'].rsplit("/", 1)[1]) + else: + copy_artifacts_step['script'] = copy_artifacts_step['script'] + \ + 'mc cp %s storage/%s/runs/$PIPELINERUN/$PODNAME/%s' % (artifact['path'], bucket, artifact['path'].rsplit("/", 1)[1]) + if artifact['path'].rsplit("/", 1)[0] not in mounted_artifact_paths: + volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]}) + volume_template.append({'name': artifact['name'], 'emptyDir': {}}) + mounted_artifact_paths.append(artifact['path'].rsplit("/", 1)[0]) + if mounted_param_paths: copy_results_step['script'] = literal_str(copy_results_step['script']) template['spec']['steps'].append(copy_results_step) + if volume_mount_step_template: template['spec']['stepTemplate'] = {} template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template template['spec']['volumes'] = volume_template + if copy_artifacts_step: + copy_artifacts_step['script'] = literal_str(copy_artifacts_step['script']) + template['spec']['steps'].append(copy_artifacts_step) # ********************************************************** # NOTE: the following features are still under development diff --git a/sdk/python/kfp_tekton/compiler/compiler.py b/sdk/python/kfp_tekton/compiler/compiler.py index 8ca50a68d0..39504278c9 100644 --- a/sdk/python/kfp_tekton/compiler/compiler.py +++ b/sdk/python/kfp_tekton/compiler/compiler.py @@ -19,6 +19,7 @@ import itertools import zipfile import re +import textwrap from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional from ._op_to_template import _op_to_template, literal_str @@ -62,6 +63,13 @@ def my_pipeline(a: int = 1, b: str = "default value"): ``` """ + def __init__(self, **kwargs): + # Intentionally set self.generate_pipeline and self.enable_artifacts to None because when _create_pipeline_workflow is called directly + # (e.g. in the case of there being no pipeline decorator), self.generate_pipeline and self.enable_artifacts is not set + self.generate_pipelinerun = None + self.enable_artifacts = None + super().__init__(**kwargs) + def _get_loop_task(self, task: Dict, op_name_to_for_loop_op): """Get the list of task references which will flatten the loop parameters defined in pipeline. @@ -115,6 +123,67 @@ def _get_loop_task(self, task: Dict, op_name_to_for_loop_op): del task['params'] return task_list + def _group_to_dag_template(self, group, inputs, outputs, dependencies): + """Generate template given an OpsGroup. + inputs, outputs, dependencies are all helper dicts. + """ + + # Generate GroupOp template + sub_group = group + template = { + 'apiVersion': tekton_api_version, + 'metadata': { + 'name': sub_group.name, + }, + 'spec': {} + } + + # Generate inputs section. + if inputs.get(group.name, None): + template_params = [{'name': x[1] if x[1] else x[0]} for x in inputs[group.name]] + template['spec']['params'] = template_params + + # Generates template sections unique to conditions + if isinstance(sub_group, dsl.OpsGroup) and sub_group.type == 'condition': + subgroup_inputs = inputs.get(sub_group.name, []) + condition = sub_group.condition + + operand1_value = str(condition.operand1) + operand2_value = str(condition.operand2) + + # Generates the operand values and exits gracefully if task outputs are trying to be used + # TODO this can be removed when Conditions support parameter passing from task outputs at which point + # the _resolve_value_or_reference method from kfp.Compiler can be used instead + if isinstance(condition.operand1, dsl.PipelineParam): + parameter_name = self._pipelineparam_full_name(condition.operand1) + task_names = [task_name for param_name, task_name in subgroup_inputs if param_name == parameter_name] + if task_names[0]: + raise TypeError("Conditions do not currently support parameter passing from task outputs") + operand1_value = '$(params.'+parameter_name+')' + if isinstance(condition.operand2, dsl.PipelineParam): + parameter_name = self._pipelineparam_full_name(condition.operand2) + task_names = [task_name for param_name, task_name in subgroup_inputs if param_name == parameter_name] + if task_names[0]: + raise TypeError("Conditions do not currently support parameter passing from task outputs") + operand1_value = '$(params.'+parameter_name+')' + + input_grab = 'EXITCODE=$(python -c \'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n' + try_catch = 'try:\n input1=int(input1)\n input2=int(input2)\nexcept Error:\n input1=str(input1)\n' + if_else = 'print(0) if (input1 ' + condition.operator + ' input2) else print(1)\' ' + operand1_value + ' ' + operand2_value + '); ' + exit_code = 'exit $EXITCODE' + shell_script = input_grab + try_catch + if_else + exit_code + + template['apiVersion'] = 'tekton.dev/v1alpha1' # TODO Change to tekton_api_version once Conditions are out of v1alpha1 + template['kind'] = 'Condition' + template['spec']['check'] = { + 'args': [shell_script], + 'command': ['sh','-c'], + 'image': 'python:alpine3.6', + } + + return template + + def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_to_templates_handler=None): """Create all groups and ops templates in the pipeline. @@ -123,7 +192,8 @@ def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_ op_transformers: A list of functions that are applied to all ContainerOp instances that are being processed. op_to_templates_handler: Handler which converts a base op into a list of argo templates. """ - op_to_steps_handler = op_to_templates_handler or (lambda op: [_op_to_template(op)]) + + op_to_steps_handler = op_to_templates_handler or (lambda op: [_op_to_template(op, self.enable_artifacts)]) root_group = pipeline.groups[0] # Call the transformation functions before determining the inputs/outputs, otherwise @@ -133,11 +203,48 @@ def _create_dag_templates(self, pipeline, op_transformers=None, params=None, op_ for transformer in op_transformers or []: transformer(op) - tasks = [] + # Generate core data structures to prepare for argo yaml generation + # op_name_to_parent_groups: op name -> list of ancestor groups including the current op + # opsgroups: a dictionary of ospgroup.name -> opsgroup + # inputs, outputs: group/op names -> list of tuples (full_param_name, producing_op_name) + # condition_params: recursive_group/op names -> list of pipelineparam + # dependencies: group/op name -> list of dependent groups/ops. + # Special Handling for the recursive opsgroup + # op_name_to_parent_groups also contains the recursive opsgroups + # condition_params from _get_condition_params_for_ops also contains the recursive opsgroups + # groups does not include the recursive opsgroups + opsgroups = self._get_groups(root_group) + op_name_to_parent_groups = self._get_groups_for_ops(root_group) + opgroup_name_to_parent_groups = self._get_groups_for_opsgroups(root_group) + condition_params = self._get_condition_params_for_ops(root_group) + inputs, outputs = self._get_inputs_outputs( + pipeline, + root_group, + op_name_to_parent_groups, + opgroup_name_to_parent_groups, + condition_params, + {} + ) + dependencies = self._get_dependencies( + pipeline, + root_group, + op_name_to_parent_groups, + opgroup_name_to_parent_groups, + opsgroups, + condition_params, + ) + + templates = [] + for opsgroup in opsgroups.keys(): + # Only Conditions get templates in Tekton + if opsgroups[opsgroup].type == 'condition': + template = self._group_to_dag_template(opsgroups[opsgroup], inputs, outputs, dependencies) + templates.append(template) + for op in pipeline.ops.values(): - tasks.extend(op_to_steps_handler(op)) + templates.extend(op_to_steps_handler(op)) - return tasks + return templates def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeline_conf=None) \ -> List[Dict[Text, Any]]: # Tekton change, signature/return type @@ -154,27 +261,44 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli params.append(param) # generate Tekton tasks from pipeline ops - tasks = self._create_dag_templates(pipeline, op_transformers, params) - - # generate task reference list for Tekton pipeline - task_refs = [ - { - 'name': t['metadata']['name'], - 'taskRef': { - 'name': t['metadata']['name'] - }, - 'params': [{ - 'name': p['name'], - 'value': p.get('default', '') - } for p in t['spec'].get('params', []) - ] - } - for t in tasks - ] + templates = self._create_dag_templates(pipeline, op_transformers, params) + + # generate task and condition reference list for the Tekton Pipeline + condition_refs = {} + task_refs = [] + for template in templates: + if template['kind'] == 'Condition': + condition_refs[template['metadata']['name']] = { + 'conditionRef': template['metadata']['name'], + 'params': [{ + 'name': param['name'], + 'value': '$(params.'+param['name']+')' + } for param in template['spec'].get('params',[]) + ] + } + else: + task_refs.append( + { + 'name': template['metadata']['name'], + 'taskRef': { + 'name': template['metadata']['name'] + }, + 'params': [{ + 'name': p['name'], + 'value': p.get('default', '') + } for p in template['spec'].get('params', []) + ] + } + ) - # add task dependencies + # add task dependencies and add condition refs to the task ref that depends on the condition + op_name_to_parent_groups = self._get_groups_for_ops(pipeline.groups[0]) for task in task_refs: op = pipeline.ops.get(task['name']) + parent_group = op_name_to_parent_groups.get(task['name'], []) + if parent_group: + if condition_refs.get(parent_group[-2],[]): + task['conditions'] = [condition_refs.get(op_name_to_parent_groups[task['name']][-2],[])] if op.dependent_names: task['runAfter'] = op.dependent_names @@ -205,6 +329,40 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli if op.timeout: task['timeout'] = '%ds' % op.timeout + # handle resourceOp cases in pipeline + for task in task_refs: + op = pipeline.ops.get(task['name']) + if isinstance(op, dsl.ResourceOp): + action = op.resource.get('action') + merge_strategy = op.resource.get('merge_strategy') + success_condition = op.resource.get('successCondition') + failure_condition = op.resource.get('failureCondition') + task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != "image"] + if not merge_strategy: + task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != 'merge-strategy'] + if not success_condition: + task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != 'success-condition'] + if not failure_condition: + task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != "failure-condition"] + for tp in task.get('params', []): + if tp.get('name') == "action" and action: + tp['value'] = action + if tp.get('name') == "merge-strategy" and merge_strategy: + tp['value'] = merge_strategy + if tp.get('name') == "success-condition" and success_condition: + tp['value'] = success_condition + if tp.get('name') == "failure-condition" and failure_condition: + tp['value'] = failure_condition + if tp.get('name') == "output": + output_values = '' + for value in sorted(list(op.attribute_outputs.items()), key=lambda x: x[0]): + output_value = textwrap.dedent("""\ + - name: %s + valueFrom: '%s' + """ % (value[0], value[1])) + output_values += output_value + tp['value'] = literal_str(output_values) + # process loop parameters, keep this section in the behind of other processes, ahead of gen pipeline root_group = pipeline.groups[0] op_name_to_for_loop_op = self._get_for_loop_ops(root_group) @@ -232,10 +390,11 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli } # append Task and Pipeline documents - workflow = tasks + [pipeline_template] + workflow = templates + [pipeline_template] # Generate pipelinerun if generate-pipelinerun flag is enabled # The base templete is generated first and then insert optional parameters. + # Wrapped in a try catch for when this method is called directly (e.g. there is no pipeline decorator) if self.generate_pipelinerun: pipelinerun = { 'apiVersion': tekton_api_version, @@ -255,7 +414,6 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli } } - pod_template = {} for task in task_refs: op = pipeline.ops.get(task['name']) @@ -273,25 +431,22 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli if pipeline_conf.timeout: pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout - workflow = workflow + [pipelinerun] + # generate the Tekton service account template + service_template = {} + if len(pipeline_conf.image_pull_secrets) > 0: + service_template = { + 'apiVersion': 'v1', + 'kind': 'ServiceAccount', + 'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'} + } + for image_pull_secret in pipeline_conf.image_pull_secrets: + service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}] - # Use regex to replace all the Argo variables to Tekton variables. For variables that are unique to Argo, - # we raise an Error to alert users about the unsupported variables. Here is the list of Argo variables. - # https://github.com/argoproj/argo/blob/master/docs/variables.md - # Since Argo variables can be used in anywhere in the yaml, we need to dump and then parse the whole yaml - # using regular expression. - workflow_dump = json.dumps(workflow) - tekton_var_regex_rules = [ - {'argo_rule': '{{inputs.parameters.([^ \t\n.:,;{}]+)}}', 'tekton_rule': '$(inputs.params.\g<1>)'}, - {'argo_rule': '{{outputs.parameters.([^ \t\n.:,;{}]+).path}}', 'tekton_rule': '$(results.\g<1>.path)'} - ] - for regex_rule in tekton_var_regex_rules: - workflow_dump = re.sub(regex_rule['argo_rule'], regex_rule['tekton_rule'], workflow_dump) + if service_template: + workflow = workflow + [service_template] + pipelinerun['spec']['serviceAccountName'] = service_template['metadata']['name'] - unsupported_vars = re.findall(r"{{[^ \t\n:,;{}]+}}", workflow_dump) - if unsupported_vars: - raise ValueError('These Argo variables are not supported in Tekton Pipeline: %s' % ", ".join(str(v) for v in set(unsupported_vars))) - workflow = json.loads(workflow_dump) + workflow = workflow + [pipelinerun] return workflow # Tekton change, from return type Dict[Text, Any] to List[Dict[Text, Any]] @@ -390,7 +545,8 @@ def compile(self, package_path, type_check=True, pipeline_conf: dsl.PipelineConf = None, - generate_pipelinerun=False): + generate_pipelinerun=False, + enable_artifacts=False): """Compile the given pipeline function into workflow yaml. Args: pipeline_func: pipeline functions with @dsl.pipeline decorator. @@ -402,6 +558,7 @@ def compile(self, generate_pipelinerun: Generate pipelinerun yaml for Tekton pipeline compilation. """ self.generate_pipelinerun = generate_pipelinerun + self.enable_artifacts = enable_artifacts super().compile(pipeline_func, package_path, type_check, pipeline_conf=pipeline_conf) @staticmethod @@ -417,6 +574,22 @@ def _write_workflow(workflow: List[Dict[Text, Any]], # Tekton change, signature yaml.Dumper.ignore_aliases = lambda *args : True yaml_text = yaml.dump_all(workflow, default_flow_style=False) # Tekton change + # Use regex to replace all the Argo variables to Tekton variables. For variables that are unique to Argo, + # we raise an Error to alert users about the unsupported variables. Here is the list of Argo variables. + # https://github.com/argoproj/argo/blob/master/docs/variables.md + # Since Argo variables can be used in anywhere in the yaml, we need to dump and then parse the whole yaml + # using regular expression. + tekton_var_regex_rules = [ + {'argo_rule': '{{inputs.parameters.([^ \t\n.:,;{}]+)}}', 'tekton_rule': '$(inputs.params.\g<1>)'}, + {'argo_rule': '{{outputs.parameters.([^ \t\n.:,;{}]+).path}}', 'tekton_rule': '$(results.\g<1>.path)'} + ] + for regex_rule in tekton_var_regex_rules: + yaml_text = re.sub(regex_rule['argo_rule'], regex_rule['tekton_rule'], yaml_text) + + unsupported_vars = re.findall(r"{{[^ \t\n.:,;{}]+\.[^ \t\n:,;{}]+}}", yaml_text) + if unsupported_vars: + raise ValueError('These Argo variables are not supported in Tekton Pipeline: %s' % ", ".join(str(v) for v in set(unsupported_vars))) + if '{{pipelineparam' in yaml_text: raise RuntimeError( 'Internal compiler error: Found unresolved PipelineParam. ' diff --git a/sdk/python/kfp_tekton/compiler/main.py b/sdk/python/kfp_tekton/compiler/main.py index d03e917ea5..25b298ddc9 100644 --- a/sdk/python/kfp_tekton/compiler/main.py +++ b/sdk/python/kfp_tekton/compiler/main.py @@ -47,12 +47,15 @@ def parse_arguments(): parser.add_argument('--generate-pipelinerun', action='store_true', help='enable pipelinerun yaml generation') + parser.add_argument('--enable-artifacts', + action='store_true', + help='enable artifact inputs and outputs') args = parser.parse_args() return args -def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, generate_pipelinerun=False): +def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, generate_pipelinerun=False, enable_artifacts=False): if len(pipeline_funcs) == 0: raise ValueError('A function with @dsl.pipeline decorator is required in the py file.') @@ -68,16 +71,16 @@ def _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_ else: pipeline_func = pipeline_funcs[0] - TektonCompiler().compile(pipeline_func, output_path, type_check, generate_pipelinerun=generate_pipelinerun) + TektonCompiler().compile(pipeline_func, output_path, type_check, generate_pipelinerun=generate_pipelinerun, enable_artifacts=enable_artifacts) -def compile_pyfile(pyfile, function_name, output_path, type_check, generate_pipelinerun=False): +def compile_pyfile(pyfile, function_name, output_path, type_check, generate_pipelinerun=False, enable_artifacts=False): sys.path.insert(0, os.path.dirname(pyfile)) try: filename = os.path.basename(pyfile) with kfp_compiler_main.PipelineCollectorContext() as pipeline_funcs: __import__(os.path.splitext(filename)[0]) - _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, generate_pipelinerun) + _compile_pipeline_function(pipeline_funcs, function_name, output_path, type_check, generate_pipelinerun, enable_artifacts) finally: del sys.path[0] @@ -89,7 +92,7 @@ def main(): (args.py is not None and args.package is not None)): raise ValueError('Either --py or --package is needed but not both.') if args.py: - compile_pyfile(args.py, args.function, args.output, not args.disable_type_check, args.generate_pipelinerun) + compile_pyfile(args.py, args.function, args.output, not args.disable_type_check, args.generate_pipelinerun, args.enable_artifacts) else: if args.namespace is None: raise ValueError('--namespace is required for compiling packages.') diff --git a/sdk/python/tests/README.md b/sdk/python/tests/README.md index 0def3b2445..2d527b0853 100644 --- a/sdk/python/tests/README.md +++ b/sdk/python/tests/README.md @@ -1,24 +1,31 @@ ## Tests for the Compiler -The test listed here can be used to compile all Python DSL pipelines in the KFP compiler testdata folder and generate a report card. As you are doing a PR to address functionality gaps in compiler, please run this test to ensure that they have been addressed. Please note that even if a Kubeflow Pipeline Python DSL script may pass the compilation with the KFP-Tekton compiler successfully, the produced Tekton YAML might not be valid or contain all of the intended functionality as the equivalent Argo YAML produced by the KFP compiler. For that, best way is to take the compiled YAML and run it on Tekton directly. +The test listed here can be used to compile all Python DSL pipelines in the KFP compiler `testdata` folder and +generate a report card. As you are working a PR to address functionality gaps in the compiler, please run this test to +update the `FAILURE`s which have been addressed. -### Running the test +Please note that even if a Kubeflow Pipeline Python DSL script may pass the compilation with the KFP-Tekton compiler +successfully, the produced Tekton YAML might not be valid or not contain all of the intended functionality as the +equivalent Argo YAML produced by the KFP compiler. For that, the best way is to take the compiled YAML and run it +on Tekton directly. + +### Running the tests - `./sdk/python/tests/test_kfp_samples.sh` -You should see an output similar to the one below, outlining which samples have passed and which are failing. +You should see an output similar to the one below, outlining which test scripts have passed and which are failing: -```bash +```YAML SUCCESS: add_pod_env.py SUCCESS: artifact_location.py -FAILURE: basic.py +SUCCESS: basic.py FAILURE: basic_no_decorator.py SUCCESS: coin.py FAILURE: compose.py SUCCESS: default_value.py FAILURE: input_artifact_raw_value.py -SUCCESS: loop_over_lightweight_output.py +FAILURE: loop_over_lightweight_output.py SUCCESS: param_op_transform.py FAILURE: param_substitutions.py SUCCESS: pipelineparams.py @@ -36,18 +43,20 @@ FAILURE: volumeop_parallel.py FAILURE: volumeop_sequential.py SUCCESS: withitem_basic.py SUCCESS: withitem_nested.py -SUCCESS: withparam_global.py -SUCCESS: withparam_global_dict.py -SUCCESS: withparam_output.py -SUCCESS: withparam_output_dict.py +FAILURE: withparam_global.py +FAILURE: withparam_global_dict.py +FAILURE: withparam_output.py +FAILURE: withparam_output_dict.py -Success: 18 -Failure: 12 +Success: 14 +Failure: 16 Total: 30 -The compilation status report was stored in /kfp-tekton/sdk/python/tests/test_kfp_samples_report.txt -The accumulated console logs can be found in /kfp-tekton/temp/test_kfp_samples_output.txt +Compilation status report: sdk/python/tests/test_kfp_samples_report.txt +Accumulated compiler logs: temp/test_kfp_samples_output.txt +Compiled Tekton YAML files: temp/tekton_compiler_output/ ``` -Goal should be to have all the 30 tests pass before we have a fair degree of confidence that the compile can handle a fair number of pipelines. +The goal should be to have all the 30 tests pass before we can have a degree of confidence that the compiler can handle +a fair number of pipelines. diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 5c65f5591e..9e7211e89e 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -124,6 +124,27 @@ def test_timeout_workflow(self): from .testdata.timeout import timeout_sample_pipeline self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml') + def test_resourceOp_workflow(self): + """ + Test compiling a resourceOp basic workflow. + """ + from .testdata.resourceop_basic import resourceop_basic + self._test_pipeline_workflow(resourceop_basic, 'resourceop_basic.yaml') + + def test_volumeOp_workflow(self): + """ + Test compiling a volumeOp basic workflow. + """ + from .testdata.volume_op import volumeop_basic + self._test_pipeline_workflow(volumeop_basic, 'volume_op.yaml') + + def test_volumeSnapshotOp_workflow(self): + """ + Test compiling a volumeSnapshotOp basic workflow. + """ + from .testdata.volume_snapshot_op import volume_snapshotop_sequential + self._test_pipeline_workflow(volume_snapshotop_sequential, 'volume_snapshot_op.yaml') + def test_hidden_output_file_workflow(self): """ Test compiling a workflow with non configurable output file. @@ -159,17 +180,49 @@ def test_pipeline_transformers_workflow(self): from .testdata.pipeline_transformers import transform_pipeline self._test_pipeline_workflow(transform_pipeline, 'pipeline_transformers.yaml') + def test_artifact_location_workflow(self): + """ + Test compiling an artifact location workflow. + """ + from .testdata.artifact_location import custom_artifact_location + self._test_pipeline_workflow(custom_artifact_location, 'artifact_location.yaml', enable_artifacts=True) + + def test_input_artifact_raw_value_workflow(self): + """ + Test compiling an input artifact workflow. + """ + from .testdata.input_artifact_raw_value import input_artifact_pipeline + self._test_pipeline_workflow(input_artifact_pipeline, 'input_artifact_raw_value.yaml') + + def test_katib_workflow(self): + """ + Test compiling a katib workflow. + """ + from .testdata.katib import mnist_hpo + self._test_pipeline_workflow(mnist_hpo, 'katib.yaml') + + def test_imagepullsecrets_workflow(self): + """ + Test compiling a imagepullsecrets workflow. + """ + from .testdata.imagepullsecrets import imagepullsecrets_pipeline + self._test_pipeline_workflow(imagepullsecrets_pipeline, 'imagepullsecrets.yaml', generate_pipelinerun=True) + def _test_pipeline_workflow(self, pipeline_function, pipeline_yaml, generate_pipelinerun=False, + enable_artifacts=False, normalize_compiler_output_function=None): test_data_dir = os.path.join(os.path.dirname(__file__), 'testdata') golden_yaml_file = os.path.join(test_data_dir, pipeline_yaml) temp_dir = tempfile.mkdtemp() compiled_yaml_file = os.path.join(temp_dir, 'workflow.yaml') try: - compiler.TektonCompiler().compile(pipeline_function, compiled_yaml_file, generate_pipelinerun=generate_pipelinerun) + compiler.TektonCompiler().compile(pipeline_function, + compiled_yaml_file, + generate_pipelinerun=generate_pipelinerun, + enable_artifacts=enable_artifacts) with open(compiled_yaml_file, 'r') as f: f = normalize_compiler_output_function( f.read()) if normalize_compiler_output_function else f diff --git a/sdk/python/tests/compiler/testdata/artifact_location.py b/sdk/python/tests/compiler/testdata/artifact_location.py new file mode 100644 index 0000000000..ab11cfb198 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/artifact_location.py @@ -0,0 +1,53 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp +from kfp import dsl +from kubernetes.client import V1SecretKeySelector + + +@dsl.pipeline( + name="custom_artifact_location_pipeline", + description="""A pipeline to demonstrate how to configure the artifact + location for all the ops in the pipeline. The default parameters are + set to run on the kubeflow namespace along with kfp""", +) +def custom_artifact_location( + secret_name: str = "mlpipeline-minio-artifact", + tag: str = '1.31.0', + namespace: str = "kubeflow", + bucket: str = "mlpipeline" +): + + # configures artifact location + pipeline_artifact_location = dsl.ArtifactLocation.s3( + bucket=bucket, + endpoint="minio-service.%s:9000" % namespace, # parameterize minio-service endpoint + insecure=True, + access_key_secret=V1SecretKeySelector(name=secret_name, key="accesskey"), + secret_key_secret={"name": secret_name, "key": "secretkey"}, # accepts dict also + ) + + # set pipeline level artifact location + dsl.get_pipeline_conf().set_artifact_location(pipeline_artifact_location) + + # artifacts in this op are stored to endpoint `minio-service.:9000` + op = dsl.ContainerOp(name="generate-output", image="busybox:%s" % tag, + command=['sh', '-c', 'echo hello > /tmp/output.txt'], + file_outputs={'output': '/tmp/output.txt'}) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(custom_artifact_location, __file__.replace('.py', '.yaml'), enable_artifacts=True) diff --git a/sdk/python/tests/compiler/testdata/artifact_location.yaml b/sdk/python/tests/compiler/testdata/artifact_location.yaml new file mode 100644 index 0000000000..f40e26b1c2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/artifact_location.yaml @@ -0,0 +1,100 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: generate-output +spec: + params: + - name: bucket + - name: namespace + - name: secret_name + - name: tag + results: + - description: /tmp/output.txt + name: output + steps: + - command: + - sh + - -c + - echo hello > $(results.output.path) + image: busybox:$(inputs.params.tag) + name: generate-output + - env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + - name: PODNAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + key: accesskey + name: $(inputs.params.secret_name) + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + key: secretkey + name: $(inputs.params.secret_name) + image: minio/mc + name: copy-artifacts + script: |- + #!/usr/bin/env sh + mc config host add storage http://minio-service.$(inputs.params.namespace):9000 $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY + mc cp $(results.output.path) storage/$(inputs.params.bucket)/runs/$PIPELINERUN/$PODNAME/output.txt +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A pipeline to demonstrate + how to configure the artifact\n location for all the ops in the pipeline. + The default parameters are\n set to run on the kubeflow namespace along with + kfp", "inputs": [{"default": "mlpipeline-minio-artifact", "name": "secret_name", + "optional": true, "type": "String"}, {"default": "1.31.0", "name": "tag", "optional": + true, "type": "String"}, {"default": "kubeflow", "name": "namespace", "optional": + true, "type": "String"}, {"default": "mlpipeline", "name": "bucket", "optional": + true, "type": "String"}], "name": "custom_artifact_location_pipeline"}' + name: custom-artifact-location-pipeline +spec: + params: + - default: mlpipeline-minio-artifact + name: secret_name + - default: 1.31.0 + name: tag + - default: kubeflow + name: namespace + - default: mlpipeline + name: bucket + tasks: + - name: generate-output + params: + - name: bucket + value: $(params.bucket) + - name: namespace + value: $(params.namespace) + - name: secret_name + value: $(params.secret_name) + - name: tag + value: $(params.tag) + taskRef: + name: generate-output diff --git a/sdk/python/tests/compiler/testdata/condition.py b/sdk/python/tests/compiler/testdata/condition.py new file mode 100644 index 0000000000..264f3c6ec2 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/condition.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp +from kfp import dsl + +def print_op(msg): + """Print a message.""" + return dsl.ContainerOp( + name='Print', + image='alpine:3.6', + command=['echo', msg], + ) + + +@dsl.pipeline( + name='Conditional Example Pipeline', + description='Shows how to use dsl.Condition().' +) +def conditional_pipeline(num: int = 5): + + with dsl.Condition(num == 5): + print_op('Number is equal to 5') + with dsl.Condition(num != 5): + print_op('Number is not equal to 5') + + +if __name__ == '__main__': + kfp.compiler.Compiler().compile(conditional_pipeline, __file__ + '.yaml') diff --git a/sdk/python/tests/compiler/testdata/condition.yaml b/sdk/python/tests/compiler/testdata/condition.yaml new file mode 100644 index 0000000000..bf1abc2464 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/condition.yaml @@ -0,0 +1,89 @@ +apiVersion: tekton.dev/v1alpha1 +kind: Condition +metadata: + name: condition-1 +spec: + check: + args: + - "EXITCODE=$(python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept Error:\n input1=str(input1)\n\ + print(0) if (input1 == input2) else print(1)' $(params.num) 5); exit $EXITCODE" + command: + - sh + - -c + image: python:alpine3.6 + params: + - name: num +--- +apiVersion: tekton.dev/v1alpha1 +kind: Condition +metadata: + name: condition-2 +spec: + check: + args: + - "EXITCODE=$(python -c 'import sys\ninput1=str.rstrip(sys.argv[1])\ninput2=str.rstrip(sys.argv[2])\n\ + try:\n input1=int(input1)\n input2=int(input2)\nexcept Error:\n input1=str(input1)\n\ + print(0) if (input1 != input2) else print(1)' $(params.num) 5); exit $EXITCODE" + command: + - sh + - -c + image: python:alpine3.6 + params: + - name: num +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: print +spec: + steps: + - command: + - echo + - Number is equal to 5 + image: alpine:3.6 + name: print +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: print-2 +spec: + steps: + - command: + - echo + - Number is not equal to 5 + image: alpine:3.6 + name: print-2 +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Shows how to use dsl.Condition().", + "inputs": [{"default": "5", "name": "num", "optional": true, "type": "Integer"}], + "name": "Conditional Example Pipeline"}' + name: conditional-example-pipeline +spec: + params: + - default: '5' + name: num + tasks: + - conditions: + - conditionRef: condition-1 + params: + - name: num + value: $(params.num) + name: print + params: [] + taskRef: + name: print + - conditions: + - conditionRef: condition-2 + params: + - name: num + value: $(params.num) + name: print-2 + params: [] + taskRef: + name: print-2 diff --git a/sdk/python/tests/compiler/testdata/imagepullsecrets.py b/sdk/python/tests/compiler/testdata/imagepullsecrets.py new file mode 100644 index 0000000000..2721e39ca9 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/imagepullsecrets.py @@ -0,0 +1,61 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Toy example demonstrating how to specify imagepullsecrets to access protected +container registry. +""" + +import kfp +import kfp.dsl as dsl +from kubernetes import client as k8s_client + + +class GetFrequentWordOp(dsl.ContainerOp): + """A get frequent word class representing a component in ML Pipelines. + + The class provides a nice interface to users by hiding details such as container, + command, arguments. + """ + def __init__(self, name, message): + """Args: + name: An identifier of the step which needs to be unique within a pipeline. + message: a dsl.PipelineParam object representing an input message. + """ + super(GetFrequentWordOp, self).__init__( + name=name, + image='python:3.5-jessie', + command=['sh', '-c'], + arguments=['python -c "from collections import Counter; ' + 'words = Counter(\'%s\'.split()); print(max(words, key=words.get))" ' + '| tee /tmp/message.txt' % message], + file_outputs={'word': '/tmp/message.txt'}) + +@dsl.pipeline( + name='Save Most Frequent', + description='Get Most Frequent Word and Save to GCS' +) +# def save_most_frequent_word(message: str): +def imagepullsecrets_pipeline(message="This is testing"): + """A pipeline function describing the orchestration of the workflow.""" + + counter = GetFrequentWordOp( + name='get-Frequent', + message=message) + # Call set_image_pull_secrets after get_pipeline_conf(). + dsl.get_pipeline_conf()\ + .set_image_pull_secrets([k8s_client.V1ObjectReference(name="secretA")]) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(imagepullsecrets_pipeline, __file__.replace('.py', '.yaml'), generate_pipelinerun=True) diff --git a/sdk/python/tests/compiler/testdata/imagepullsecrets.yaml b/sdk/python/tests/compiler/testdata/imagepullsecrets.yaml new file mode 100644 index 0000000000..360b6a655f --- /dev/null +++ b/sdk/python/tests/compiler/testdata/imagepullsecrets.yaml @@ -0,0 +1,72 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: get-frequent +spec: + params: + - name: message + results: + - description: /tmp/message.txt + name: word + steps: + - args: + - python -c "from collections import Counter; words = Counter('$(inputs.params.message)'.split()); + print(max(words, key=words.get))" | tee $(results.word.path) + command: + - sh + - -c + image: python:3.5-jessie + name: get-frequent +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Get Most Frequent Word + and Save to GCS", "inputs": [{"default": "This is testing", "name": "message", + "optional": true}], "name": "Save Most Frequent"}' + name: save-most-frequent +spec: + params: + - default: This is testing + name: message + tasks: + - name: get-frequent + params: + - name: message + value: $(params.message) + taskRef: + name: get-frequent +--- +apiVersion: v1 +imagePullSecrets: +- name: secretA +kind: ServiceAccount +metadata: + name: save-most-frequent-run-sa +--- +apiVersion: tekton.dev/v1beta1 +kind: PipelineRun +metadata: + name: save-most-frequent-run +spec: + params: + - name: message + value: This is testing + pipelineRef: + name: save-most-frequent + serviceAccountName: save-most-frequent-run-sa diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py new file mode 100644 index 0000000000..3c2d6191a3 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py @@ -0,0 +1,70 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sys +from pathlib import Path + +sys.path.insert(0, __file__ + '/../../../../') + +import kfp +from kfp import dsl + + +def component_with_inline_input_artifact(text: str): + return dsl.ContainerOp( + name='component_with_inline_input_artifact', + image='alpine', + command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional + ) + + +def component_with_input_artifact(text): + '''A component that passes text as input artifact''' + + return dsl.ContainerOp( + name='component_with_input_artifact', + artifact_argument_paths=[ + dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional + ], + image='alpine', + command=['cat', '/tmp/inputs/text/data'], + ) + +def component_with_hardcoded_input_artifact_value(): + '''A component that passes hard-coded text as input artifact''' + return component_with_input_artifact('hard-coded artifact value') + + +def component_with_input_artifact_value_from_file(file_path): + '''A component that passes contents of a file as input artifact''' + return component_with_input_artifact(Path(file_path).read_text()) + + +@dsl.pipeline( + name='Pipeline with artifact input raw argument value.', + description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.' +) +def input_artifact_pipeline(): + component_with_inline_input_artifact('Constant artifact value') + component_with_input_artifact('Constant artifact value') + component_with_hardcoded_input_artifact_value() + + file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt')) + component_with_input_artifact_value_from_file(file_path) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(input_artifact_pipeline, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt new file mode 100644 index 0000000000..7953e7335b --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt @@ -0,0 +1 @@ +Text from a file with hard-coded artifact value diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml new file mode 100644 index 0000000000..9a7c9583a6 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml @@ -0,0 +1,142 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-inline-input-artifact +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Constant artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-inline-input-artifact + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Constant artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact-2 +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "hard-coded artifact value" > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact-2 + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: component-with-input-artifact-3 +spec: + stepTemplate: + volumeMounts: + - mountPath: /tmp/inputs/text + name: text + steps: + - image: busybox + name: copy-inputs + script: | + #!/bin/sh + set -exo pipefail + echo -n "Text from a file with hard-coded artifact value + " > /tmp/inputs/text/data + - command: + - cat + - /tmp/inputs/text/data + image: alpine + name: component-with-input-artifact-3 + volumes: + - emptyDir: {} + name: text +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to + define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with + artifact input raw argument value."}' + name: pipeline-with-artifact-input-raw-argument-value +spec: + params: [] + tasks: + - name: component-with-inline-input-artifact + params: [] + taskRef: + name: component-with-inline-input-artifact + - name: component-with-input-artifact + params: [] + taskRef: + name: component-with-input-artifact + - name: component-with-input-artifact-2 + params: [] + taskRef: + name: component-with-input-artifact-2 + - name: component-with-input-artifact-3 + params: [] + taskRef: + name: component-with-input-artifact-3 diff --git a/sdk/python/tests/compiler/testdata/katib.py b/sdk/python/tests/compiler/testdata/katib.py new file mode 100644 index 0000000000..fa2d13ae35 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/katib.py @@ -0,0 +1,129 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import kfp.dsl as dsl + +@dsl.pipeline( + name="Launch katib experiment", + description="An example to launch katib experiment." +) +def mnist_hpo( + name="mnist", + namespace="kubeflow", + goal=0.99, + parallelTrialCount=3, + maxTrialCount=12, + experimentTimeoutMinutes=60, + deleteAfterDone=True): + objectiveConfig = { + "type": "maximize", + "goal": goal, + "objectiveMetricName": "Validation-accuracy", + "additionalMetricNames": ["accuracy"] + } + algorithmConfig = {"algorithmName" : "random"} + parameters = [ + {"name": "--lr", "parameterType": "double", "feasibleSpace": {"min": "0.01","max": "0.03"}}, + {"name": "--num-layers", "parameterType": "int", "feasibleSpace": {"min": "2", "max": "5"}}, + {"name": "--optimizer", "parameterType": "categorical", "feasibleSpace": {"list": ["sgd", "adam", "ftrl"]}} + ] + rawTemplate = { + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": "{{.Trial}}", + "namespace": "{{.NameSpace}}" + }, + "spec": { + "template": { + "spec": { + "restartPolicy": "Never", + "containers": [ + {"name": "{{.Trial}}", + "image": "docker.io/katib/mxnet-mnist-example", + "command": [ + "python /mxnet/example/image-classification/train_mnist.py --batch-size=64 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}" + ] + } + ] + } + } + } + } + trialTemplate = { + "goTemplate": { + "rawTemplate": json.dumps(rawTemplate) + } + } + op1 = katib_experiment_launcher_op( + name, + namespace, + parallelTrialCount=parallelTrialCount, + maxTrialCount=maxTrialCount, + objectiveConfig=str(objectiveConfig), + algorithmConfig=str(algorithmConfig), + trialTemplate=str(trialTemplate), + parameters=str(parameters), + experimentTimeoutMinutes=experimentTimeoutMinutes, + deleteAfterDone=deleteAfterDone + ) + + op_out = dsl.ContainerOp( + name="my-out-cop", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo hyperparameter: %s" % op1.output], + ) + + +def katib_experiment_launcher_op( + name, + namespace, + maxTrialCount=100, + parallelTrialCount=3, + maxFailedTrialCount=3, + objectiveConfig='{}', + algorithmConfig='{}', + metricsCollector='{}', + trialTemplate='{}', + parameters='[]', + experimentTimeoutMinutes=60, + deleteAfterDone=True, + outputFile='/output.txt'): + return dsl.ContainerOp( + name = "mnist-hpo", + image = 'liuhougangxa/katib-experiment-launcher:latest', + arguments = [ + '--name', name, + '--namespace', namespace, + '--maxTrialCount', maxTrialCount, + '--maxFailedTrialCount', maxFailedTrialCount, + '--parallelTrialCount', parallelTrialCount, + '--objectiveConfig', objectiveConfig, + '--algorithmConfig', algorithmConfig, + '--metricsCollector', metricsCollector, + '--trialTemplate', trialTemplate, + '--parameters', parameters, + '--outputFile', outputFile, + '--deleteAfterDone', deleteAfterDone, + '--experimentTimeoutMinutes', experimentTimeoutMinutes, + ], + file_outputs = {'bestHyperParameter': outputFile} + ) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(mnist_hpo, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/katib.yaml b/sdk/python/tests/compiler/testdata/katib.yaml new file mode 100644 index 0000000000..8a5d559f33 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/katib.yaml @@ -0,0 +1,141 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: mnist-hpo +spec: + params: + - name: deleteAfterDone + - name: experimentTimeoutMinutes + - name: goal + - name: maxTrialCount + - name: name + - name: namespace + - name: parallelTrialCount + results: + - description: /output.txt + name: bestHyperParameter + steps: + - args: + - --name + - $(inputs.params.name) + - --namespace + - $(inputs.params.namespace) + - --maxTrialCount + - $(inputs.params.maxTrialCount) + - --maxFailedTrialCount + - '3' + - --parallelTrialCount + - $(inputs.params.parallelTrialCount) + - --objectiveConfig + - '{''type'': ''maximize'', ''goal'': $(inputs.params.goal), ''objectiveMetricName'': + ''Validation-accuracy'', ''additionalMetricNames'': [''accuracy'']}' + - --algorithmConfig + - '{''algorithmName'': ''random''}' + - --metricsCollector + - '{}' + - --trialTemplate + - '{''goTemplate'': {''rawTemplate'': ''{"apiVersion": "batch/v1", "kind": "Job", + "metadata": {"name": "{{.Trial}}", "namespace": "{{.NameSpace}}"}, "spec": {"template": + {"spec": {"restartPolicy": "Never", "containers": [{"name": "{{.Trial}}", "image": + "docker.io/katib/mxnet-mnist-example", "command": ["python /mxnet/example/image-classification/train_mnist.py + --batch-size=64 {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} + {{- end}} {{- end}}"]}]}}}}''}}' + - --parameters + - '[{''name'': ''--lr'', ''parameterType'': ''double'', ''feasibleSpace'': {''min'': + ''0.01'', ''max'': ''0.03''}}, {''name'': ''--num-layers'', ''parameterType'': + ''int'', ''feasibleSpace'': {''min'': ''2'', ''max'': ''5''}}, {''name'': ''--optimizer'', + ''parameterType'': ''categorical'', ''feasibleSpace'': {''list'': [''sgd'', + ''adam'', ''ftrl'']}}]' + - --outputFile + - $(results.bestHyperParameter.path) + - --deleteAfterDone + - $(inputs.params.deleteAfterDone) + - --experimentTimeoutMinutes + - $(inputs.params.experimentTimeoutMinutes) + image: liuhougangxa/katib-experiment-launcher:latest + name: mnist-hpo +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: my-out-cop +spec: + params: + - name: mnist-hpo-bestHyperParameter + steps: + - args: + - 'echo hyperparameter: $(inputs.params.mnist-hpo-bestHyperParameter)' + command: + - sh + - -c + image: library/bash:4.4.23 + name: my-out-cop +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "An example to launch katib + experiment.", "inputs": [{"default": "mnist", "name": "name", "optional": true}, + {"default": "kubeflow", "name": "namespace", "optional": true}, {"default": + "0.99", "name": "goal", "optional": true}, {"default": "3", "name": "parallelTrialCount", + "optional": true}, {"default": "12", "name": "maxTrialCount", "optional": true}, + {"default": "60", "name": "experimentTimeoutMinutes", "optional": true}, {"default": + "True", "name": "deleteAfterDone", "optional": true}], "name": "Launch katib + experiment"}' + name: launch-katib-experiment +spec: + params: + - default: mnist + name: name + - default: kubeflow + name: namespace + - default: '0.99' + name: goal + - default: '3' + name: parallelTrialCount + - default: '12' + name: maxTrialCount + - default: '60' + name: experimentTimeoutMinutes + - default: 'True' + name: deleteAfterDone + tasks: + - name: mnist-hpo + params: + - name: deleteAfterDone + value: $(params.deleteAfterDone) + - name: experimentTimeoutMinutes + value: $(params.experimentTimeoutMinutes) + - name: goal + value: $(params.goal) + - name: maxTrialCount + value: $(params.maxTrialCount) + - name: name + value: $(params.name) + - name: namespace + value: $(params.namespace) + - name: parallelTrialCount + value: $(params.parallelTrialCount) + taskRef: + name: mnist-hpo + - name: my-out-cop + params: + - name: mnist-hpo-bestHyperParameter + value: $(tasks.mnist-hpo.results.bestHyperParameter) + taskRef: + name: my-out-cop diff --git a/sdk/python/tests/compiler/testdata/resourceop_basic.py b/sdk/python/tests/compiler/testdata/resourceop_basic.py new file mode 100644 index 0000000000..3f763f62d9 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/resourceop_basic.py @@ -0,0 +1,67 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +""" +This example demonstrates how to use ResourceOp to specify the value of env var. +""" + +import json +import kfp.dsl as dsl + +_CONTAINER_MANIFEST = """ +{ + "apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "generateName": "resourceop-basic-job-" + }, + "spec": { + "template": { + "metadata": { + "name": "resource-basic" + }, + "spec": { + "containers": [{ + "name": "sample-container", + "image": "k8s.gcr.io/busybox", + "command": ["/usr/bin/env"] + }], + "restartPolicy": "Never" + } + }, + "backoffLimit": 4 + } +} +""" + + +@dsl.pipeline( + name="ResourceOp Basic", + description="A Basic Example on ResourceOp Usage." +) +def resourceop_basic(): + + # Start a container. Print out env vars. + op = dsl.ResourceOp( + name='test-step', + k8s_resource=json.loads(_CONTAINER_MANIFEST), + action='create' + ) + + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(resourceop_basic, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/resourceop_basic.yaml b/sdk/python/tests/compiler/testdata/resourceop_basic.yaml new file mode 100644 index 0000000000..15f09b3189 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/resourceop_basic.yaml @@ -0,0 +1,89 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: test-step +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: batch/v1\nkind: Job\nmetadata:\n generateName: resourceop-basic-job-\n\ + spec:\n backoffLimit: 4\n template:\n metadata:\n name: resource-basic\n\ + \ spec:\n containers:\n - command:\n - /usr/bin/env\n \ + \ image: k8s.gcr.io/busybox\n name: sample-container\n restartPolicy:\ + \ Never\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + image: $(params.image) + name: test-step + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A Basic Example on ResourceOp + Usage.", "name": "ResourceOp Basic"}' + name: resourceop-basic +spec: + params: [] + tasks: + - name: test-step + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n" + - name: set-ownerreference + value: 'false' + taskRef: + name: test-step diff --git a/sdk/python/tests/compiler/testdata/volume_op.py b/sdk/python/tests/compiler/testdata/volume_op.py new file mode 100644 index 0000000000..81837e1016 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/volume_op.py @@ -0,0 +1,42 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import kfp +import kfp.dsl as dsl + + +@dsl.pipeline( + name="VolumeOp Basic", + description="A Basic Example on VolumeOp Usage." +) +def volumeop_basic(size): + vop = dsl.VolumeOp( + name="create-pvc", + resource_name="my-pvc", + modes=dsl.VOLUME_MODE_RWO, + size=size + ) + + cop = dsl.ContainerOp( + name="cop", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["echo foo > /mnt/file1"], + pvolumes={"/mnt": vop.volume} + ) + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(volumeop_basic, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/volume_op.yaml b/sdk/python/tests/compiler/testdata/volume_op.yaml new file mode 100644 index 0000000000..aabd520068 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/volume_op.yaml @@ -0,0 +1,128 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: create-pvc +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + - name: size + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + - description: '{.status.capacity.storage}' + name: size + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: v1\nkind: PersistentVolumeClaim\nmetadata:\n name:\ + \ $(PIPELINERUN)-my-pvc\nspec:\n accessModes:\n - ReadWriteOnce\n resources:\n\ + \ requests:\n storage: $(inputs.params.size)\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + image: $(params.image) + name: create-pvc + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: cop +spec: + params: + - name: create-pvc-name + steps: + - args: + - echo foo > /mnt/file1 + command: + - sh + - -c + image: library/bash:4.4.23 + name: cop + volumeMounts: + - mountPath: /mnt + name: create-pvc + volumes: + - name: create-pvc + persistentVolumeClaim: + claimName: $(inputs.params.create-pvc-name) +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "A Basic Example on VolumeOp + Usage.", "inputs": [{"name": "size"}], "name": "VolumeOp Basic"}' + name: volumeop-basic +spec: + params: + - name: size + tasks: + - name: create-pvc + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\ + - name: size\n valueFrom: '{.status.capacity.storage}'\n" + - name: set-ownerreference + value: 'false' + - name: size + value: $(params.size) + taskRef: + name: create-pvc + - name: cop + params: + - name: create-pvc-name + value: $(tasks.create-pvc.results.name) + taskRef: + name: cop diff --git a/sdk/python/tests/compiler/testdata/volume_snapshot_op.py b/sdk/python/tests/compiler/testdata/volume_snapshot_op.py new file mode 100644 index 0000000000..d82a7eecc6 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/volume_snapshot_op.py @@ -0,0 +1,96 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""This sample uses Rok as an example to show case how VolumeOp accepts +annotations as an extra argument, and how we can use arbitrary PipelineParams +to determine their contents. + +The specific annotation is Rok-specific, but the use of annotations in such way +is widespread in storage systems integrated with K8s. +""" + +import kfp.dsl as dsl + + +@dsl.pipeline( + name="VolumeSnapshotOp Sequential", + description="""The fourth example of the design doc. Please enable the + volume snapshot feature gate in order to run this pipeline.""" +) +def volume_snapshotop_sequential(url): + vop = dsl.VolumeOp( + name="create_volume", + resource_name="vol1", + size="1Gi", + modes=dsl.VOLUME_MODE_RWO + ) + + step1 = dsl.ContainerOp( + name="step1_ingest", + image="google/cloud-sdk:279.0.0", + command=["sh", "-c"], + arguments=["mkdir /data/step1 && " + "gsutil cat %s | gzip -c >/data/step1/file1.gz" % url], + pvolumes={"/data": vop.volume} + ) + + step1_snap = dsl.VolumeSnapshotOp( + name="step1_snap", + resource_name="step1_snap", + volume=step1.pvolume + ) + + step2 = dsl.ContainerOp( + name="step2_gunzip", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["mkdir /data/step2 && " + "gunzip /data/step1/file1.gz -c >/data/step2/file1"], + pvolumes={"/data": step1.pvolume} + ) + + step2_snap = dsl.VolumeSnapshotOp( + name="step2_snap", + resource_name="step2_snap", + volume=step2.pvolume + ) + + step3 = dsl.ContainerOp( + name="step3_copy", + image="library/bash:4.4.23", + command=["sh", "-c"], + arguments=["mkdir /data/step3 && " + "cp -av /data/step2/file1 /data/step3/file3"], + pvolumes={"/data": step2.pvolume} + ) + + step3_snap = dsl.VolumeSnapshotOp( + name="step3_snap", + resource_name="step3_snap", + volume=step3.pvolume + ) + + step4 = dsl.ContainerOp( + name="step4_output", + image="library/bash:4.4.23", + command=["cat", "/data/step2/file1", "/data/step3/file3"], + pvolumes={"/data": step3.pvolume} + ) + + +if __name__ == '__main__': + # don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile + from kfp_tekton.compiler import TektonCompiler + TektonCompiler().compile(volume_snapshotop_sequential, __file__.replace('.py', '.yaml')) diff --git a/sdk/python/tests/compiler/testdata/volume_snapshot_op.yaml b/sdk/python/tests/compiler/testdata/volume_snapshot_op.yaml new file mode 100644 index 0000000000..6ca6ee8ff3 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/volume_snapshot_op.yaml @@ -0,0 +1,456 @@ +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: create-volume +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + - description: '{.status.capacity.storage}' + name: size + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: v1\nkind: PersistentVolumeClaim\nmetadata:\n name:\ + \ $(PIPELINERUN)-vol1\nspec:\n accessModes:\n - ReadWriteOnce\n resources:\n\ + \ requests:\n storage: 1Gi\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + image: $(params.image) + name: create-volume + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step1-ingest +spec: + params: + - name: create-volume-name + - name: url + steps: + - args: + - mkdir /data/step1 && gsutil cat $(inputs.params.url) | gzip -c >/data/step1/file1.gz + command: + - sh + - -c + image: google/cloud-sdk:279.0.0 + name: step1-ingest + volumeMounts: + - mountPath: /data + name: create-volume + volumes: + - name: create-volume + persistentVolumeClaim: + claimName: $(inputs.params.create-volume-name) +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step1-snap +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + - name: create-volume-name + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + - description: '{.status.restoreSize}' + name: size + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: snapshot.storage.k8s.io/v1alpha1\nkind: VolumeSnapshot\n\ + metadata:\n name: $(PIPELINERUN)-step1-snap\nspec:\n source:\n kind: PersistentVolumeClaim\n\ + \ name: $(inputs.params.create-volume-name)\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + image: $(params.image) + name: step1-snap + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step2-gunzip +spec: + params: + - name: create-volume-name + steps: + - args: + - mkdir /data/step2 && gunzip /data/step1/file1.gz -c >/data/step2/file1 + command: + - sh + - -c + image: library/bash:4.4.23 + name: step2-gunzip + volumeMounts: + - mountPath: /data + name: create-volume + volumes: + - name: create-volume + persistentVolumeClaim: + claimName: $(inputs.params.create-volume-name) +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step2-snap +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + - name: create-volume-name + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + - description: '{.status.restoreSize}' + name: size + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: snapshot.storage.k8s.io/v1alpha1\nkind: VolumeSnapshot\n\ + metadata:\n name: $(PIPELINERUN)-step2-snap\nspec:\n source:\n kind: PersistentVolumeClaim\n\ + \ name: $(inputs.params.create-volume-name)\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + image: $(params.image) + name: step2-snap + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step3-copy +spec: + params: + - name: create-volume-name + steps: + - args: + - mkdir /data/step3 && cp -av /data/step2/file1 /data/step3/file3 + command: + - sh + - -c + image: library/bash:4.4.23 + name: step3-copy + volumeMounts: + - mountPath: /data + name: create-volume + volumes: + - name: create-volume + persistentVolumeClaim: + claimName: $(inputs.params.create-volume-name) +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step3-snap +spec: + params: + - description: Action on the resource + name: action + type: string + - default: strategic + description: Merge strategy when using action patch + name: merge-strategy + type: string + - default: '' + description: An express to retrieval data from resource. + name: output + type: string + - default: '' + description: A label selector express to decide if the action on resource is success. + name: success-condition + type: string + - default: '' + description: A label selector express to decide if the action on resource is failure. + name: failure-condition + type: string + - default: index.docker.io/aipipeline/kubeclient:v0.0.2 + description: Kubectl wrapper image + name: image + type: string + - default: 'false' + description: Enable set owner reference for created resource. + name: set-ownerreference + type: string + - name: create-volume-name + results: + - description: '{}' + name: manifest + - description: '{.metadata.name}' + name: name + - description: '{.status.restoreSize}' + name: size + steps: + - args: + - --action=$(params.action) + - --merge-strategy=$(params.merge-strategy) + - "--manifest=apiVersion: snapshot.storage.k8s.io/v1alpha1\nkind: VolumeSnapshot\n\ + metadata:\n name: $(PIPELINERUN)-step3-snap\nspec:\n source:\n kind: PersistentVolumeClaim\n\ + \ name: $(inputs.params.create-volume-name)\n" + - --output=$(params.output) + - --success-condition=$(params.success-condition) + - --failure-condition=$(params.failure-condition) + - --set-ownerreference=$(params.set-ownerreference) + env: + - name: PIPELINERUN + valueFrom: + fieldRef: + fieldPath: metadata.labels['tekton.dev/pipelineRun'] + image: $(params.image) + name: step3-snap + resources: {} +--- +apiVersion: tekton.dev/v1beta1 +kind: Task +metadata: + name: step4-output +spec: + params: + - name: create-volume-name + steps: + - command: + - cat + - /data/step2/file1 + - /data/step3/file3 + image: library/bash:4.4.23 + name: step4-output + volumeMounts: + - mountPath: /data + name: create-volume + volumes: + - name: create-volume + persistentVolumeClaim: + claimName: $(inputs.params.create-volume-name) +--- +apiVersion: tekton.dev/v1beta1 +kind: Pipeline +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "The fourth example of + the design doc. Please enable the\n volume snapshot feature gate in order + to run this pipeline.", "inputs": [{"name": "url"}], "name": "VolumeSnapshotOp + Sequential"}' + name: volumesnapshotop-sequential +spec: + params: + - name: url + tasks: + - name: create-volume + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\ + - name: size\n valueFrom: '{.status.capacity.storage}'\n" + - name: set-ownerreference + value: 'false' + taskRef: + name: create-volume + - name: step1-ingest + params: + - name: create-volume-name + value: $(tasks.create-volume.results.name) + - name: url + value: $(params.url) + taskRef: + name: step1-ingest + - name: step1-snap + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\ + - name: size\n valueFrom: '{.status.restoreSize}'\n" + - name: success-condition + value: status.readyToUse == true + - name: set-ownerreference + value: 'false' + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step1-ingest + taskRef: + name: step1-snap + - name: step2-gunzip + params: + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step1-ingest + taskRef: + name: step2-gunzip + - name: step2-snap + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\ + - name: size\n valueFrom: '{.status.restoreSize}'\n" + - name: success-condition + value: status.readyToUse == true + - name: set-ownerreference + value: 'false' + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step2-gunzip + taskRef: + name: step2-snap + - name: step3-copy + params: + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step2-gunzip + taskRef: + name: step3-copy + - name: step3-snap + params: + - name: action + value: create + - name: output + value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\ + - name: size\n valueFrom: '{.status.restoreSize}'\n" + - name: success-condition + value: status.readyToUse == true + - name: set-ownerreference + value: 'false' + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step3-copy + taskRef: + name: step3-snap + - name: step4-output + params: + - name: create-volume-name + value: $(tasks.create-volume.results.name) + runAfter: + - step3-copy + taskRef: + name: step4-output diff --git a/sdk/python/tests/config.yaml b/sdk/python/tests/config.yaml new file mode 100644 index 0000000000..4d863a9541 --- /dev/null +++ b/sdk/python/tests/config.yaml @@ -0,0 +1,13 @@ +pipeline: compose.py +type: nested +components: +- name: save_most_frequent_word +- name: download_save_most_frequent_word +--- +pipeline: basic_no_decorator.py +type: no_decorator +components: + function: save_most_frequent_word + name: 'Save Most Frequent' + description: 'Get Most Frequent Word and Save to GCS' + paramsList: ["message_param", "output_path_param"] \ No newline at end of file diff --git a/sdk/python/tests/test_kfp_samples.sh b/sdk/python/tests/test_kfp_samples.sh index 3def3ae0af..272464b0c6 100755 --- a/sdk/python/tests/test_kfp_samples.sh +++ b/sdk/python/tests/test_kfp_samples.sh @@ -1,4 +1,4 @@ -#!/bin/bash -e +#!/bin/bash # Copyright 2020 kubeflow.org # @@ -33,6 +33,7 @@ KFP_TESTDATA_DIR="${KFP_CLONE_DIR}/sdk/python/tests/compiler/testdata" TEKTON_COMPILED_YAML_DIR="${TEMP_DIR}/tekton_compiler_output" COMPILE_REPORT_FILE="${PROJECT_DIR}/sdk/python/tests/test_kfp_samples_report.txt" COMPILER_OUTPUTS_FILE="${TEMP_DIR}/test_kfp_samples_output.txt" +CONFIG_FILE="${PROJECT_DIR}/sdk/python/tests/config.yaml" mkdir -p "${TEMP_DIR}" mkdir -p "${TEKTON_COMPILED_YAML_DIR}" @@ -63,21 +64,29 @@ fi echo # just adding some separation for console output -# keep a record of the previous compilation status -SUCCESS_BEFORE=$(grep -c "SUCCESS" "${COMPILE_REPORT_FILE}") -FAILURE_BEFORE=$(grep -c "FAILURE" "${COMPILE_REPORT_FILE}") -TOTAL_BEFORE=$(grep -c . "${COMPILE_REPORT_FILE}") +# create a temporary copy of the previous compilation report +COMPILE_REPORT_FILE_OLD="${COMPILE_REPORT_FILE/%.txt/_before.txt}" +cp "${COMPILE_REPORT_FILE}" "${COMPILE_REPORT_FILE_OLD}" # delete the previous compiler output file rm -f "${COMPILER_OUTPUTS_FILE}" +# check which pipelines have special configurations +PIPELINES=$(awk '/pipeline:/{print $NF}' ${CONFIG_FILE}) + +# compile each of the Python scripts in the KFP testdata folder for f in "${KFP_TESTDATA_DIR}"/*.py; do echo -e "\nCompiling ${f##*/}:" >> "${COMPILER_OUTPUTS_FILE}" - if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1; - then - echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}" + IS_SPECIAL=$(grep -E ${f##*/} <<< ${PIPELINES}) + if [ -z "${IS_SPECIAL}" ]; then + if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1; + then + echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}" + else + echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}" + fi else - echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}" + python3 -m test_util ${f} ${CONFIG_FILE} | grep -E 'SUCCESS:|FAILURE:' fi done | tee "${COMPILE_REPORT_FILE}" @@ -90,21 +99,29 @@ TOTAL=$(grep -c . "${COMPILE_REPORT_FILE}") echo "Success: ${SUCCESS}" echo "Failure: ${FAILURE}" echo "Total: ${TOTAL}" -) | tee -a "${COMPILE_REPORT_FILE}" +) # | tee -a "${COMPILE_REPORT_FILE}" # do not include totals in report file to avoid constant merge conflicts echo echo "Compilation status report: ${COMPILE_REPORT_FILE#${PROJECT_DIR}/}" echo "Accumulated compiler logs: ${COMPILER_OUTPUTS_FILE#${PROJECT_DIR}/}" echo "Compiled Tekton YAML files: ${TEKTON_COMPILED_YAML_DIR#${PROJECT_DIR}/}/" echo -# for Travis/CI integration return exit code 1 if success rate declined -if [ ${SUCCESS} -lt "${SUCCESS_BEFORE}" ]; then - echo "It appears that fewer KFP test scripts are compiling than before!" +# for Travis/CI integration return exit code 1 if this report is different from the previous report +# sort the list of files since we cannot ensure same sort order on MacOS (local) and Linux (build machine) +if ! diff -q -a -w -B <(sort "${COMPILE_REPORT_FILE}") <(sort "${COMPILE_REPORT_FILE_OLD}") >/dev/null 2>&1 ; then + echo + echo "This compilation report (left) differs from the previous report (right):" + echo + diff -y -W 80 --suppress-common-lines -d \ + <(sort -k2 "${COMPILE_REPORT_FILE}") \ + <(sort -k2 "${COMPILE_REPORT_FILE_OLD}") echo - echo "Success before: ${SUCCESS_BEFORE}" - echo "Failure before: ${FAILURE_BEFORE}" - echo "Total before: ${TOTAL_BEFORE}" + rm -f "${COMPILE_REPORT_FILE_OLD}" exit 1 else + echo + echo "This compilation report did not change from the previous report." + echo + rm -f "${COMPILE_REPORT_FILE_OLD}" exit 0 fi diff --git a/sdk/python/tests/test_kfp_samples_report.txt b/sdk/python/tests/test_kfp_samples_report.txt index b0ff66ebc8..3606971f0b 100644 --- a/sdk/python/tests/test_kfp_samples_report.txt +++ b/sdk/python/tests/test_kfp_samples_report.txt @@ -1,34 +1,30 @@ SUCCESS: add_pod_env.py SUCCESS: artifact_location.py SUCCESS: basic.py -FAILURE: basic_no_decorator.py -SUCCESS: coin.py -FAILURE: compose.py +SUCCESS: basic_no_decorator.py +FAILURE: coin.py +SUCCESS: compose.py SUCCESS: default_value.py -FAILURE: input_artifact_raw_value.py +SUCCESS: input_artifact_raw_value.py FAILURE: loop_over_lightweight_output.py SUCCESS: param_op_transform.py -FAILURE: param_substitutions.py +SUCCESS: param_substitutions.py SUCCESS: pipelineparams.py -SUCCESS: recursive_do_while.py +FAILURE: recursive_do_while.py SUCCESS: recursive_while.py -FAILURE: resourceop_basic.py +SUCCESS: resourceop_basic.py SUCCESS: sidecar.py SUCCESS: timeout.py SUCCESS: volume.py -FAILURE: volume_snapshotop_rokurl.py -FAILURE: volume_snapshotop_sequential.py -FAILURE: volumeop_basic.py -FAILURE: volumeop_dag.py -FAILURE: volumeop_parallel.py -FAILURE: volumeop_sequential.py +SUCCESS: volume_snapshotop_rokurl.py +SUCCESS: volume_snapshotop_sequential.py +SUCCESS: volumeop_basic.py +SUCCESS: volumeop_dag.py +SUCCESS: volumeop_parallel.py +SUCCESS: volumeop_sequential.py SUCCESS: withitem_basic.py SUCCESS: withitem_nested.py FAILURE: withparam_global.py FAILURE: withparam_global_dict.py FAILURE: withparam_output.py FAILURE: withparam_output_dict.py - -Success: 13 -Failure: 17 -Total: 30 diff --git a/sdk/python/tests/test_util.py b/sdk/python/tests/test_util.py new file mode 100644 index 0000000000..8212c59820 --- /dev/null +++ b/sdk/python/tests/test_util.py @@ -0,0 +1,102 @@ +#!/bin/bash + +# Copyright 2020 kubeflow.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +import shutil +import zipfile +import yaml +import tempfile +import importlib +import kfp_tekton.compiler as compiler +import filecmp + +def _get_yaml_from_zip(zip_file): + with zipfile.ZipFile(zip_file, 'r') as zip: + with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file: + return list(yaml.safe_load_all(yaml_file)) + +def get_config(config_path): + with open(config_path) as file: + return list(yaml.safe_load_all(file)) + +def get_params_from_config(pipeline_name, config_path): + pipelines = get_config(config_path) + + for pipeline in pipelines: + if pipeline_name == pipeline["pipeline"]: + return pipeline + +def test_workflow_without_decorator(pipeline_mod, params_dict): + """Test compiling a workflow and appending pipeline params.""" + + try: + pipeline_params = [] + for param in params_dict.get('paramsList', []): + pipeline_params.append(getattr(pipeline_mod, param)) + + compiled_workflow = compiler.TektonCompiler()._create_workflow( + getattr(pipeline_mod,params_dict['function']), + params_dict.get('name', None), + params_dict.get('description', None), + pipeline_params if pipeline_params else None, + params_dict.get('conf', None)) + return True + except : + return False + +def test_nested_workflow(pipeline_mod, pipeline_list): + """Test compiling a simple workflow, and a bigger one composed from the simple one.""" + + tmpdir = tempfile.mkdtemp() + try: + for pipeline in pipeline_list: + pipeline_name = pipeline['name'] + package_path = os.path.join(tmpdir, pipeline_name + '.zip') + compiler.TektonCompiler().compile(getattr(pipeline_mod, pipeline_name), package_path) + return True + except: + return False + + +if __name__ == '__main__': + test_data_path = sys.argv[1] + config_path = sys.argv[2] + did_compile = False + + # Import pipeline + test_data_dir, test_data_file = os.path.split(test_data_path) + import_name, test_data_ext = os.path.splitext(test_data_file) + sys.path.append(test_data_dir) + pipeline_mod = importlib.import_module(import_name) + + # Get the pipeline specific parameters from the config file + params = get_params_from_config(test_data_file, config_path) + if params == None: + raise ValueError('No pipeline matches available in the config file') + test_type = params['type'] + + if test_type == 'nested': + did_compile = test_nested_workflow(pipeline_mod, params['components']) + elif test_type == 'no_decorator': + did_compile = test_workflow_without_decorator(pipeline_mod, params['components']) + else: + raise ValueError('Pipeline type \''+test_type+'\' is not recognized') + + if did_compile: + print("SUCCESS:", test_data_file) + else: + print("FAILURE:", test_data_file) From ddc40d8d51f7b2884c8b4b0a7e4f0e1db1c47e1b Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Tue, 21 Apr 2020 13:25:48 -0700 Subject: [PATCH 2/3] fix conflicts --- .../kfp_tekton/compiler/_op_to_template.py | 28 ------------------- 1 file changed, 28 deletions(-) diff --git a/sdk/python/kfp_tekton/compiler/_op_to_template.py b/sdk/python/kfp_tekton/compiler/_op_to_template.py index af8f0d02b2..a7b689eb8c 100644 --- a/sdk/python/kfp_tekton/compiler/_op_to_template.py +++ b/sdk/python/kfp_tekton/compiler/_op_to_template.py @@ -234,7 +234,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): template['spec']['params'] = inputs['parameters'] elif isinstance(op, dsl.ResourceOp): template['spec']['params'].extend(inputs['parameters']) -<<<<<<< HEAD if 'artifacts' in inputs: # The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input. # Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation. @@ -256,10 +255,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): template['spec']['stepTemplate'] = {} template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template template['spec']['volumes'] = volume_template -======= - elif 'artifacts' in inputs: - raise NotImplementedError("input artifacts are not yet implemented") ->>>>>>> upstream/master # outputs if isinstance(op, dsl.ContainerOp): @@ -270,19 +265,10 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): param_outputs = {} outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts) if outputs_dict: -<<<<<<< HEAD -<<<<<<< HEAD -======= -======= ->>>>>>> upstream/master volume_mount_step_template = [] volume_template = [] mounted_param_paths = [] replaced_param_list = [] -<<<<<<< HEAD ->>>>>>> upstream/master -======= ->>>>>>> upstream/master if outputs_dict.get('parameters'): """ Since Tekton results need to be under /tekton/results. If file output paths cannot be @@ -299,21 +285,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): cp $LOCALPATH $(results.data.path); """ template['spec']['results'] = [] -<<<<<<< HEAD -<<<<<<< HEAD copy_results_step = copy.deepcopy(base_step) -======= -======= ->>>>>>> upstream/master - copy_results_step = { - 'image': 'busybox', - 'name': 'copy-results', - 'script': '#!/bin/sh\nset -exo pipefail\n' - } -<<<<<<< HEAD ->>>>>>> upstream/master -======= ->>>>>>> upstream/master for name, path in processed_op.file_outputs.items(): name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore template['spec']['results'].append({ From 7e4df54198c615ae61a172212b45105dc95129a2 Mon Sep 17 00:00:00 2001 From: Tommy Li Date: Wed, 22 Apr 2020 11:47:33 -0700 Subject: [PATCH 3/3] fix merging bug --- sdk/python/kfp_tekton/compiler/_op_to_template.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/kfp_tekton/compiler/_op_to_template.py b/sdk/python/kfp_tekton/compiler/_op_to_template.py index a7b689eb8c..e7f55b73e2 100644 --- a/sdk/python/kfp_tekton/compiler/_op_to_template.py +++ b/sdk/python/kfp_tekton/compiler/_op_to_template.py @@ -265,10 +265,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False): param_outputs = {} outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts) if outputs_dict: - volume_mount_step_template = [] - volume_template = [] - mounted_param_paths = [] - replaced_param_list = [] if outputs_dict.get('parameters'): """ Since Tekton results need to be under /tekton/results. If file output paths cannot be