Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support artifact outputs #92

Merged
merged 13 commits into from
Apr 21, 2020
32 changes: 31 additions & 1 deletion sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ Here we update the `Compiler` of the KFP SDK to generate `Tekton` YAML for a bas

Please [refer to the instructions here](./python/tests/README.md) as you work on a PR test sample Kubeflow Pipelines in their test data folder to ensure your PR is improving the number of successful samples

## Compile Kubeflow Pipelines as Tekton pipelineRun
## Experimental features

### 1. Compile Kubeflow Pipelines as Tekton pipelineRun

By default, Tekton pipelineRun is generated by the `tkn` CLI so that users can interactively change their pipeline parameters during each execution. However, `tkn` CLI is lagging several important features when generating pipelineRun. Therefore, we added support for generating pipelineRun using `dsl-compile-tekton` with all the latest kfp-tekton compiler features. The comparison between Tekton pipeline and Argo workflow is described in our [design docs](https://docs.google.com/document/d/1oXOdiItI4GbEe_qzyBmMAqfLBjfYX1nM94WHY3EPa94/edit#heading=h.f38y0bqkxo87).

Expand All @@ -96,7 +98,35 @@ As of today, the below pipelineRun features are available within `dsl-compile-te
To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--generate-pipelinerun` as part of your `dsl-compile-tekton`commands. e.g.
- `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/tolerations.py --output pipeline.yaml --generate-pipelinerun`

### 2. Compile Kubeflow Pipelines with artifact enabled

Prerequisite: Install [Kubeflow Pipeline](https://www.kubeflow.org/docs/pipelines/installation/).

By default, artifacts are disabled because it's depended on Kubeflow Pipeline's minio setup. When artifacts are enabled, all the output parameters are also treated as artifacts and persist to the default object storage. Enabling artifacts also allow files to be downloaded or stored as artifact inputs/outputs. Since artifacts are depending on the Kubeflow Pipeline's setup by default, the generated Tekton pipeline must be deployed to the same namespace as Kubeflow Pipeline.

To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--enable-artifacts` as part of your `dsl-compile-tekton` commands. Then, run the pipeline on the same namespace as Kubeflow pipeline using the `-n` flag. e.g.
```shell
dsl-compile-tekton --py sdk/python/tests/compiler/testdata/artifact_location.py --output pipeline.yaml --enable-artifacts
kubectl apply -f pipeline.yaml -n kubeflow
tkn pipeline start custom-artifact-location-pipeline --showlog -n kubeflow
```

You should see the below outputs saying the artifacts are stored in the object storage you specify.
```
? Value for param `secret_name` of type `string`? (Default is `mlpipeline-minio-artifact`) mlpipeline-minio-artifact
? Value for param `tag` of type `string`? (Default is `1.31.0`) 1.31.0
? Value for param `namespace` of type `string`? (Default is `kubeflow`) kubeflow
? Value for param `bucket` of type `string`? (Default is `mlpipeline`) mlpipeline
Pipelinerun started: custom-artifact-location-pipeline-run-b87bq
Waiting for logs to be available...

[generate-output : copy-artifacts] Added `storage` successfully.
[generate-output : copy-artifacts] `/tekton/results/output` -> `storage/mlpipeline/runs/custom-artifact-location-pipeline-run-b87bq/custom-artifact-location-pipeline-run-b87bq-generate-outp-7rnxv/output.txt`
[generate-output : copy-artifacts] Total: 0 B, Transferred: 6 B, Speed: 504 B/s
```

Tomcli marked this conversation as resolved.
Show resolved Hide resolved
## Troubleshooting
- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline.

- When you add test cases to compiler_tests, the output of pipeline/pipelinerun yaml may has uncertain values or orders, then you can define a lambda function as normalize_compiler_output_function to pass the testing.

198 changes: 133 additions & 65 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
from kfp.compiler._op_to_template import _process_obj, _inputs_to_json, _outputs_to_json
from kfp import dsl
from kfp.dsl._container_op import BaseOp
from kfp.dsl import ArtifactLocation
from urllib.parse import urlparse
import textwrap
import re
import os

from .. import tekton_api_version

Expand Down Expand Up @@ -63,7 +68,7 @@ def _process_base_ops(op: BaseOp):
return op


def _op_to_template(op: BaseOp):
def _op_to_template(op: BaseOp, enable_artifacts=False):
"""Generate template given an operator inherited from BaseOp."""

# NOTE in-place update to BaseOp
Expand All @@ -77,15 +82,18 @@ def _op_to_template(op: BaseOp):
# This should have been as easy as output_artifact_paths.update(op.file_outputs), but the _outputs_to_json function changes the output names and we must do the same here, so that the names are the same
output_artifact_paths.update(sorted(((param.full_name, processed_op.file_outputs[param.name]) for param in processed_op.outputs.values()), key=lambda x: x[0]))

output_artifacts = [
# convert_k8s_obj_to_json(
# ArtifactLocation.create_artifact_for_s3(
# op.artifact_location,
# name=name,
# path=path,
# key='runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz'))
# for name, path in output_artifact_paths.items()
]
if enable_artifacts:
output_artifacts = [
convert_k8s_obj_to_json(
ArtifactLocation.create_artifact_for_s3(
op.artifact_location,
name=name,
path=path,
key='runs/$PIPELINERUN/$PODNAME/' + name))
for name, path in output_artifact_paths.items()
]
else:
output_artifacts = []

# workflow template
container = convert_k8s_obj_to_json(
Expand Down Expand Up @@ -208,68 +216,128 @@ def _op_to_template(op: BaseOp):
param_outputs = {}
outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts)
if outputs_dict:
"""
Since Tekton results need to be under /tekton/results. If file output paths cannot be
configured to /tekton/results, we need to create the below copy step for moving
file outputs to the Tekton destination. BusyBox is recommended to be used on
small tasks because it's relatively lightweight and small compared to the ubuntu and
bash images.

- image: busybox
name: copy-results
script: |
#!/bin/sh
set -exo pipefail
cp $LOCALPATH $(results.data.path);
"""
template['spec']['results'] = []
copy_results_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
volume_mount_step_template = []
volume_template = []
mounted_paths = []
for name, path in param_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
'name': name,
'description': path
})
# replace all occurrences of the output file path with the Tekton output parameter expression
need_copy_step = True
for s in template['spec']['steps']:
if 'command' in s:
commands = []
for c in s['command']:
if path in c:
c = c.replace(path, '$(results.%s.path)' % name)
need_copy_step = False
commands.append(c)
s['command'] = commands
if 'args' in s:
args = []
for a in s['args']:
if path in a:
a = a.replace(path, '$(results.%s.path)' % name)
need_copy_step = False
args.append(a)
s['args'] = args
# If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path
if need_copy_step:
copy_results_step['script'] = copy_results_step['script'] + 'cp ' + path + ' $(results.%s.path);' % name + '\n'
mountPath = path.rsplit("/", 1)[0]
if mountPath not in mounted_paths:
volume_mount_step_template.append({'name': name, 'mountPath': path.rsplit("/", 1)[0]})
volume_template.append({'name': name, 'emptyDir': {}})
mounted_paths.append(mountPath)
if mounted_paths:
mounted_param_paths = []
replaced_param_list = []
if outputs_dict.get('parameters'):
"""
Since Tekton results need to be under /tekton/results. If file output paths cannot be
configured to /tekton/results, we need to create the below copy step for moving
file outputs to the Tekton destination. BusyBox is recommended to be used on
small tasks because it's relatively lightweight and small compared to the ubuntu and
bash images.

- image: busybox
name: copy-results
script: |
#!/bin/sh
set -exo pipefail
cp $LOCALPATH $(results.data.path);
"""
template['spec']['results'] = []
copy_results_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
for name, path in processed_op.file_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
'name': name,
'description': path
})
# replace all occurrences of the output file path with the Tekton output parameter expression
need_copy_step = True
for s in template['spec']['steps']:
if 'command' in s:
commands = []
for c in s['command']:
if path in c:
c = c.replace(path, '$(results.%s.path)' % name)
need_copy_step = False
commands.append(c)
s['command'] = commands
if 'args' in s:
args = []
for a in s['args']:
if path in a:
a = a.replace(path, '$(results.%s.path)' % name)
need_copy_step = False
args.append(a)
s['args'] = args
# If file output path cannot be found/replaced, use emptyDir to copy it to the tekton/results path
if need_copy_step:
copy_results_step['script'] = copy_results_step['script'] + 'cp ' + path + ' $(results.%s.path);' % name + '\n'
mountPath = path.rsplit("/", 1)[0]
if mountPath not in mounted_param_paths:
volume_mount_step_template.append({'name': name, 'mountPath': path.rsplit("/", 1)[0]})
volume_template.append({'name': name, 'emptyDir': {}})
mounted_param_paths.append(mountPath)
# Record what artifacts are moved to result parameters.
parameter_name = (processed_op.name + '-' + name).replace(' ', '-').replace('_', '-')
replaced_param_list.append(parameter_name)
copy_artifacts_step = {}
if outputs_dict.get('artifacts'):
"""
For storing artifacts, we will be using the minio/mc image because we need to upload artifacts to any type of
object storage and endpoint. The minio/mc is the best image suited for this task because the default KFP
is using minio and it also works well with other s3/gcs type of storage.

- image: minio/mc
name: copy-artifacts
script: |
#!/usr/bin/env sh
mc config host add storage http://minio-service.$NAMESPACE:9000 $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY
mc cp /tmp/file.txt storage/$(inputs.params.bucket)/runs/$PIPELINERUN/$PODNAME/file.txt
"""
# TODO: Pull default values from KFP configmap when integrated with KFP.
storage_location = outputs_dict['artifacts'][0].get('s3', {})
insecure = storage_location.get("insecure", True)
endpoint = storage_location.get("endpoint", "minio-service.$NAMESPACE:9000")
# We want to use the insecure flag to figure out whether to use http or https scheme
endpoint = re.sub(r"https?://", "", endpoint)
endpoint = 'http://' + endpoint if insecure else 'https://' + endpoint
access_key = storage_location.get("accessKeySecret", {"name": "mlpipeline-minio-artifact", "key": "accesskey"})
secret_access_key = storage_location.get("secretKeySecret", {"name": "mlpipeline-minio-artifact", "key": "secretkey"})
bucket = storage_location.get("bucket", "mlpipeline")
copy_artifacts_step = {
'image': 'minio/mc',
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this block is growing really large, should we move this into a separate funtion/method and maybe we could make the copy_artifacts_step into a Python template string. In the future we could keep all the highly descriptive code snippets (YAML with a few parameters) in a separate folder as opposed to mixing it with the Python code. We could track that in a separate issue :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, there are big chunk of code templates introduced for resourceOp and artifacts because we need to use an extra step/task to support these additional features. There's still some merging happening for volume and volume snapshot op along with some new changes to resource op that Vincent did yesterday. I can open an issue to refactor this file and compiler.py.

'name': 'copy-artifacts',
'script': textwrap.dedent('''\
#!/usr/bin/env sh
mc config host add storage %s $AWS_ACCESS_KEY_ID $AWS_SECRET_ACCESS_KEY
''' % (endpoint)),
'env': [
{'name': 'PIPELINERUN', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.labels['tekton.dev/pipelineRun']"}}},
{'name': 'PODNAME', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.name"}}},
{'name': 'NAMESPACE', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.namespace"}}},
{'name': 'AWS_ACCESS_KEY_ID', 'valueFrom': {'secretKeyRef': {'name': access_key['name'], 'key': access_key['key']}}},
{'name': 'AWS_SECRET_ACCESS_KEY', 'valueFrom': {'secretKeyRef': {'name': secret_access_key['name'], 'key': secret_access_key['key']}}}
]
}
mounted_artifact_paths = []
for artifact in outputs_dict['artifacts']:
if artifact['name'] in replaced_param_list:
copy_artifacts_step['script'] = copy_artifacts_step['script'] + \
'mc cp $(results.%s.path) storage/%s/runs/$PIPELINERUN/$PODNAME/%s' % (name, bucket, artifact['path'].rsplit("/", 1)[1])
else:
copy_artifacts_step['script'] = copy_artifacts_step['script'] + \
'mc cp %s storage/%s/runs/$PIPELINERUN/$PODNAME/%s' % (artifact['path'], bucket, artifact['path'].rsplit("/", 1)[1])
if artifact['path'].rsplit("/", 1)[0] not in mounted_artifact_paths:
volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]})
volume_template.append({'name': artifact['name'], 'emptyDir': {}})
mounted_artifact_paths.append(artifact['path'].rsplit("/", 1)[0])
if mounted_param_paths:
copy_results_step['script'] = literal_str(copy_results_step['script'])
template['spec']['steps'].append(copy_results_step)
if volume_mount_step_template:
template['spec']['stepTemplate'] = {}
template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template
template['spec']['volumes'] = volume_template
if copy_artifacts_step:
copy_artifacts_step['script'] = literal_str(copy_artifacts_step['script'])
template['spec']['steps'].append(copy_artifacts_step)

# **********************************************************
# NOTE: the following features are still under development
Expand Down
Loading