Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support input artifact #104

Merged
merged 4 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import yaml
import re
import os
import copy

from .. import tekton_api_version

Expand Down Expand Up @@ -213,6 +214,17 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
steps.extend(template['spec']['steps'])
template['spec']['steps'] = steps

# initial base_step and volume setup
base_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
volume_mount_step_template = []
volume_template = []
mounted_param_paths = []
replaced_param_list = []

# inputs
input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None
artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None
Expand All @@ -222,8 +234,27 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
template['spec']['params'] = inputs['parameters']
elif isinstance(op, dsl.ResourceOp):
template['spec']['params'].extend(inputs['parameters'])
elif 'artifacts' in inputs:
raise NotImplementedError("input artifacts are not yet implemented")
if 'artifacts' in inputs:
# The input artifacts in KFP is not pulling from s3, it will always be passed as a raw input.
# Visit https://github.com/kubeflow/pipelines/issues/336 for more details on the implementation.
copy_inputs_step = copy.deepcopy(base_step)
copy_inputs_step['name'] = 'copy-inputs'
for artifact in inputs['artifacts']:
if 'raw' in artifact:
copy_inputs_step['script'] += 'echo -n "%s" > %s\n' % (artifact['raw']['data'], artifact['path'])
mountPath = artifact['path'].rsplit("/", 1)[0]
if mountPath not in mounted_param_paths:
volume_mount_step_template.append({'name': artifact['name'], 'mountPath': artifact['path'].rsplit("/", 1)[0]})
volume_template.append({'name': artifact['name'], 'emptyDir': {}})
mounted_param_paths.append(mountPath)
copy_inputs_step['script'] = literal_str(copy_inputs_step['script'])
with_inputs_step = [copy_inputs_step]
with_inputs_step.extend(template['spec']['steps'])
template['spec']['steps'] = with_inputs_step
if volume_mount_step_template:
template['spec']['stepTemplate'] = {}
template['spec']['stepTemplate']['volumeMounts'] = volume_mount_step_template
template['spec']['volumes'] = volume_template

# outputs
if isinstance(op, dsl.ContainerOp):
Expand All @@ -234,10 +265,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
param_outputs = {}
outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts)
if outputs_dict:
volume_mount_step_template = []
volume_template = []
mounted_param_paths = []
replaced_param_list = []
if outputs_dict.get('parameters'):
"""
Since Tekton results need to be under /tekton/results. If file output paths cannot be
Expand All @@ -254,11 +281,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
cp $LOCALPATH $(results.data.path);
"""
template['spec']['results'] = []
copy_results_step = {
'image': 'busybox',
'name': 'copy-results',
'script': '#!/bin/sh\nset -exo pipefail\n'
}
copy_results_step = copy.deepcopy(base_step)
for name, path in processed_op.file_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,18 @@ def test_pipeline_transformers_workflow(self):

def test_artifact_location_workflow(self):
"""
Test compiling a artifact location workflow.
Test compiling an artifact location workflow.
"""
from .testdata.artifact_location import custom_artifact_location
self._test_pipeline_workflow(custom_artifact_location, 'artifact_location.yaml', enable_artifacts=True)

def test_input_artifact_raw_value_workflow(self):
"""
Test compiling an input artifact workflow.
"""
from .testdata.input_artifact_raw_value import input_artifact_pipeline
self._test_pipeline_workflow(input_artifact_pipeline, 'input_artifact_raw_value.yaml')

def test_katib_workflow(self):
"""
Test compiling a katib workflow.
Expand Down
70 changes: 70 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import sys
from pathlib import Path

sys.path.insert(0, __file__ + '/../../../../')

import kfp
from kfp import dsl


def component_with_inline_input_artifact(text: str):
return dsl.ContainerOp(
name='component_with_inline_input_artifact',
image='alpine',
command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional
)


def component_with_input_artifact(text):
'''A component that passes text as input artifact'''

return dsl.ContainerOp(
name='component_with_input_artifact',
artifact_argument_paths=[
dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional
],
image='alpine',
command=['cat', '/tmp/inputs/text/data'],
)

def component_with_hardcoded_input_artifact_value():
'''A component that passes hard-coded text as input artifact'''
return component_with_input_artifact('hard-coded artifact value')


def component_with_input_artifact_value_from_file(file_path):
'''A component that passes contents of a file as input artifact'''
return component_with_input_artifact(Path(file_path).read_text())


@dsl.pipeline(
name='Pipeline with artifact input raw argument value.',
description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.'
)
def input_artifact_pipeline():
component_with_inline_input_artifact('Constant artifact value')
component_with_input_artifact('Constant artifact value')
component_with_hardcoded_input_artifact_value()

file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt'))
component_with_input_artifact_value_from_file(file_path)

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(input_artifact_pipeline, __file__.replace('.py', '.yaml'))
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Text from a file with hard-coded artifact value
142 changes: 142 additions & 0 deletions sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-inline-input-artifact
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Constant artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-inline-input-artifact
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Constant artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact-2
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "hard-coded artifact value" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact-2
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: component-with-input-artifact-3
spec:
stepTemplate:
volumeMounts:
- mountPath: /tmp/inputs/text
name: text
steps:
- image: busybox
name: copy-inputs
script: |
#!/bin/sh
set -exo pipefail
echo -n "Text from a file with hard-coded artifact value
" > /tmp/inputs/text/data
- command:
- cat
- /tmp/inputs/text/data
image: alpine
name: component-with-input-artifact-3
volumes:
- emptyDir: {}
name: text
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to
define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with
artifact input raw argument value."}'
name: pipeline-with-artifact-input-raw-argument-value
spec:
params: []
tasks:
- name: component-with-inline-input-artifact
params: []
taskRef:
name: component-with-inline-input-artifact
- name: component-with-input-artifact
params: []
taskRef:
name: component-with-input-artifact
- name: component-with-input-artifact-2
params: []
taskRef:
name: component-with-input-artifact-2
- name: component-with-input-artifact-3
params: []
taskRef:
name: component-with-input-artifact-3
2 changes: 1 addition & 1 deletion sdk/python/tests/test_kfp_samples_report.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SUCCESS: basic_no_decorator.py
FAILURE: coin.py
SUCCESS: compose.py
SUCCESS: default_value.py
FAILURE: input_artifact_raw_value.py
SUCCESS: input_artifact_raw_value.py
FAILURE: loop_over_lightweight_output.py
SUCCESS: param_op_transform.py
SUCCESS: param_substitutions.py
Expand Down