Skip to content

Commit

Permalink
feat(sdk): support dynamic machine type parameters in pipeline task s…
Browse files Browse the repository at this point in the history
…etters (#11097)

* temp title: change title

Signed-off-by: KevinGrantLee <[email protected]>

* add release notes

Signed-off-by: KevinGrantLee <[email protected]>

* formatting

Signed-off-by: KevinGrantLee <[email protected]>

* feat(backend): move comp logic to workflow params (#10979)

* feat(backend): move comp logic to workflow params

Signed-off-by: zazulam <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Signed-off-by: zazulam <[email protected]>

* address pr comments

Signed-off-by: zazulam <[email protected]>

* Use function name instead of base name and address edge cases

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Improve logic and update tests

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* POC hashing command and args

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Add comments to clarify the logic

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

* Hash entire PipelineContainerSpec

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: zazulam <[email protected]>

---------

Signed-off-by: zazulam <[email protected]>
Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* feat(component): internal

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 660985413
Signed-off-by: KevinGrantLee <[email protected]>

* feat(components): internal

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 661332120
Signed-off-by: KevinGrantLee <[email protected]>

* fix(components): Fix to model batch explanation component for Structured Data pipelines

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 661475667
Signed-off-by: KevinGrantLee <[email protected]>

* feat(components): Support dynamic values for boot_disk_type, boot_disk_size in preview.custom_job.utils.create_custom_training_job_from_component

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 662242688
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Upgrade Argo to v3.4.17 (#10978)

Signed-off-by: Giulio Frasca <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* test: Moved kubeflow-pipelines-manifests to GitHub Actions (#11066)

Signed-off-by: vmudadla <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix: re-enable exit hanler test. (#11100)

Signed-off-by: Liav Weiss (EXT-Nokia) <[email protected]>
Co-authored-by: Liav Weiss (EXT-Nokia) <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(frontend): retrieve archived logs from correct location (#11010)

* fix(frontend): retrieve archived logs from correct location

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Add namespace tag handling and validation

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Remove whitespace from keyFormat

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>

* Update frontend unit tests

Signed-off-by: droctothorpe <[email protected]>

* Remove superfluous log statements

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>

* Add link to keyFormat in manifests

Signed-off-by: droctothorpe <[email protected]>

* Fix workflow parsing for log artifact

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: quinnovator <[email protected]>

* Fix unit test

Signed-off-by: droctothorpe <[email protected]>

---------

Signed-off-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: owmasch <[email protected]>
Co-authored-by: quinnovator <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* feat(component): internal

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 663774557
Signed-off-by: KevinGrantLee <[email protected]>

* feat(component): internal

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 663872006
Signed-off-by: KevinGrantLee <[email protected]>

* chore(components): GCPC 2.16.1 Release

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 663883139
Signed-off-by: KevinGrantLee <[email protected]>

* test: Fail fast when image build fails on tests #11102 (#11115)

* Fail fast when image build fails on tests #11102

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>

* Fail fast when image build fails on tests #11102

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>

---------

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Co-authored-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(components): Use instance.target_field_name format for text-bison models only, use target_field_name for gemini models

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 665638487
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Renamed GitHub workflows from *.yaml to *.yml for consistency (#11126)

Signed-off-by: hbelmiro <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* Fix view edit cluster roles (#11067)

* Fixing incorrect typing in loop_parallism example

Signed-off-by: Oswaldo Gomez <[email protected]>

* Fixing samples/core/loop_parameter example

Signed-off-by: Oswaldo Gomez <[email protected]>

* Fixing aggregate-to-kubeflow-pipelines-edit

Signed-off-by: Oswaldo Gomez <[email protected]>

* keeping MRs separate.

Signed-off-by: Oswaldo Gomez <[email protected]>

* Adding blank line

Signed-off-by: Oswaldo Gomez <[email protected]>

---------

Signed-off-by: Oswaldo Gomez <[email protected]>
Co-authored-by: Oswaldo Gomez <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(components): Pass moddel name to eval_runner to process batch prediction's output as per the output schema of model used

Signed-off-by: Googler <[email protected]>
PiperOrigin-RevId: 665977093
Signed-off-by: KevinGrantLee <[email protected]>

* feat(components): release LLM Model Evaluation image version v0.7

Signed-off-by: Jason Dai <[email protected]>
PiperOrigin-RevId: 666102687
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Adding @DharmitD to SDK reviewers (#11131)

Signed-off-by: ddalvi <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* test: Kubeflow Pipelines V2 integration Tests (#11125)

Signed-off-by: Diego Lovison <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Add make targets for building driver and launcher images (#11103)

Signed-off-by: Giulio Frasca <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* feat(Backend + SDK): Update kfp backend and kubernetes sdk to support EmptyDir (#10913)

Update kfp backend and kubernetes sdk to support mounting EmptyDir
volumes to task pods.

Inspired by #10427

Fixes: #10656

Signed-off-by: Greg Sheremeta <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* docs:fixing broken links in readme (#11108)

Signed-off-by: Fiona Waters <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(deps): bump micromatch from 4.0.5 to 4.0.8 in /test/frontend-integration-test (#11132)

Bumps [micromatch](https://github.com/micromatch/micromatch) from 4.0.5 to 4.0.8.
- [Release notes](https://github.com/micromatch/micromatch/releases)
- [Changelog](https://github.com/micromatch/micromatch/blob/4.0.8/CHANGELOG.md)
- [Commits](micromatch/micromatch@4.0.5...4.0.8)

---
updated-dependencies:
- dependency-name: micromatch
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: KevinGrantLee <[email protected]>

* Fix: Basic sample tests - sequential is flaky (#11138)

Signed-off-by: Diego Lovison <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Wrapped "Failed GetContextByTypeAndName" error for better troubleshooting (#11098)

Signed-off-by: hbelmiro <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(components): Update AutoSxS and RLHF image tags

Signed-off-by: Michael Hu <[email protected]>
PiperOrigin-RevId: 668536503
Signed-off-by: KevinGrantLee <[email protected]>

* test: Improvements to wait_for_pods function (#11162)

Signed-off-by: hbelmiro <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(frontend): fixes filter pipeline text box shows error when typing anything in it. Fixes #10241 (#11096)

* Filter pipeline text box shows error when typing anything in it #10241

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>

* Filter pipeline text box shows error when typing anything in it #10241

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>

---------

Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Co-authored-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* correct artifact preview behavior in UI (#11059)

This change allows KFP UI to fallback to UI host namespace when no
namespaces are provided when referencing the artifact object store
provider secret, in default kubeflow deployments this namespace is
simply "kubeflow", however the user can customize this behavior by
providing the environment variable "SERVER_NAMESPACE" to the KFP UI
deployment.

Further more, this change addresses a bug that caused URL
parse to fail when parsing endpoints without a protocol, this will
support such endpoint types as <ip>:<port> for object store endpoints,
as is the case in the default kfp deployment manifests.

Signed-off-by: Humair Khan <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Added DCO link to PR template (#11176)

Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(backend): Update driver and launcher licenses (#11177)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(backend): update driver and launcher default images (#11178)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Add instructions for releasing driver and launcher images (#11179)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* test: Fixed `kfp-runtime-tests` to run on master branch (#11158)

Signed-off-by: hbelmiro <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* (fix): reduce executor logs (#11169)

* remove driver logs from executor

These logs congest the executor runtime logs making it difficult for the
user to differentiate between logs. The driver logs are unnecessary here
and can be removed to reduce this clutter.

Signed-off-by: Humair Khan <[email protected]>

* remove duplicate emissary call in executor

As per the initial inline dev comment, argo podspecpatch did not add the
emissary call, and had to be manualy added. This was fixed a couple of
argo versions back. However, as a result executor pod makes double calls
to the executor, which as a consequence also results in superflous logs.

This change removes the additional call to emissary to resolve this.

Signed-off-by: Humair Khan <[email protected]>

---------

Signed-off-by: Humair Khan <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: add PaulinaPacyna and ouadakarim as reviewers (#11180)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* test: Move run-all-gcpc-modules to GitHub Actions  (#11157)

* add gcpc modules tests to gha

Signed-off-by: Amanpreet Singh Bedi <[email protected]>

* remove run-all-gcpc-modules test driver script

Signed-off-by: Amanpreet Singh Bedi <[email protected]>

* fix path under gcpc modules tests github action

Signed-off-by: Amanpreet Singh Bedi <[email protected]>

* upgrade ubuntu base image

Signed-off-by: Amanpreet Singh Bedi <[email protected]>

* upgrade python version to 3.9

Signed-off-by: Amanpreet Singh Bedi <[email protected]>

---------

Signed-off-by: Amanpreet Singh Bedi <[email protected]>
Signed-off-by: Amanpreet Singh Bedi <[email protected]>
Co-authored-by: Amanpreet Singh Bedi <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(sdk): Kfp support for pip trusted host (#11151)

Signed-off-by: Diego Lovison <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(sdk): Loosening kubernetes dependency constraint (#11079)

* Loosening kubernetes dependency constraint

Signed-off-by: egeucak <[email protected]>

* added setuptools in test script

Signed-off-by: egeucak <[email protected]>

---------

Signed-off-by: egeucak <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Remove unwanted Frontend test files (#10973)

Signed-off-by: ddalvi <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* fix(ui): fixes empty string value in pipeline parameters (#11175)

Signed-off-by: Jan Staněk <[email protected]>
Co-authored-by: Jan Staněk <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(backend): update driver and launcher default images (#11182)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(release): bumped version to 2.3.0

Signed-off-by: KevinGrantLee <[email protected]>

* chore: Update RELEASE.md to remove obsolete instructions (#11183)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: Release kfp-pipeline-spec 0.4.0 (#11189)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: release kfp-kubernetes 1.3.0 (#11190)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore: update kfp-kubernetes release scripts and instructions (#11191)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* feat(sdk)!: Pin kfp-pipeline-spec==0.4.0, kfp-server-api>=2.1.0,<2.4.0 (#11192)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* chore(sdk): release KFP SDK 2.9.0 (#11193)

Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: KevinGrantLee <[email protected]>

* Delete test pipelines as they are duplicate with
pipeline_with_resource_spec

Signed-off-by: KevinGrantLee <[email protected]>

---------

Signed-off-by: KevinGrantLee <[email protected]>
Signed-off-by: zazulam <[email protected]>
Signed-off-by: droctothorpe <[email protected]>
Signed-off-by: Googler <[email protected]>
Signed-off-by: Giulio Frasca <[email protected]>
Signed-off-by: vmudadla <[email protected]>
Signed-off-by: Liav Weiss (EXT-Nokia) <[email protected]>
Signed-off-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Signed-off-by: hbelmiro <[email protected]>
Signed-off-by: Oswaldo Gomez <[email protected]>
Signed-off-by: Jason Dai <[email protected]>
Signed-off-by: ddalvi <[email protected]>
Signed-off-by: Diego Lovison <[email protected]>
Signed-off-by: Greg Sheremeta <[email protected]>
Signed-off-by: Fiona Waters <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Signed-off-by: Michael Hu <[email protected]>
Signed-off-by: Humair Khan <[email protected]>
Signed-off-by: Helber Belmiro <[email protected]>
Signed-off-by: Chen Sun <[email protected]>
Signed-off-by: Amanpreet Singh Bedi <[email protected]>
Signed-off-by: Amanpreet Singh Bedi <[email protected]>
Signed-off-by: egeucak <[email protected]>
Signed-off-by: Jan Staněk <[email protected]>
Co-authored-by: Michael <[email protected]>
Co-authored-by: droctothorpe <[email protected]>
Co-authored-by: andreafehrman <[email protected]>
Co-authored-by: MonicaZhang1 <[email protected]>
Co-authored-by: kylekaminky <[email protected]>
Co-authored-by: CarterFendley <[email protected]>
Co-authored-by: Googler <[email protected]>
Co-authored-by: Giulio Frasca <[email protected]>
Co-authored-by: Vani Haripriya Mudadla <[email protected]>
Co-authored-by: Liav Weiss <[email protected]>
Co-authored-by: Liav Weiss (EXT-Nokia) <[email protected]>
Co-authored-by: owmasch <[email protected]>
Co-authored-by: quinnovator <[email protected]>
Co-authored-by: ElayAharoni <[email protected]>
Co-authored-by: Elay Aharoni (EXT-Nokia) <[email protected]>
Co-authored-by: Helber Belmiro <[email protected]>
Co-authored-by: Oswaldo Gomez <[email protected]>
Co-authored-by: Oswaldo Gomez <[email protected]>
Co-authored-by: Jason Dai <[email protected]>
Co-authored-by: Dharmit Dalvi <[email protected]>
Co-authored-by: Diego Lovison <[email protected]>
Co-authored-by: Greg Sheremeta <[email protected]>
Co-authored-by: Fiona Waters <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Michael Hu <[email protected]>
Co-authored-by: Humair Khan <[email protected]>
Co-authored-by: Chen Sun <[email protected]>
Co-authored-by: aman23bedi <[email protected]>
Co-authored-by: Amanpreet Singh Bedi <[email protected]>
Co-authored-by: ege uçak <[email protected]>
Co-authored-by: Jan Staněk <[email protected]>
Co-authored-by: Jan Staněk <[email protected]>
  • Loading branch information
1 parent 0b92f86 commit 94b1a0d
Show file tree
Hide file tree
Showing 11 changed files with 608 additions and 130 deletions.
1 change: 1 addition & 0 deletions sdk/RELEASE.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Current Version (in development)

## Features
* Support dynamic machine type parameters in pipeline task setters. [\#11097](https://github.com/kubeflow/pipelines/pull/11097)

## Breaking changes

Expand Down
24 changes: 12 additions & 12 deletions sdk/python/kfp/compiler/compiler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -3382,31 +3382,31 @@ def simple_pipeline():
['exec-return-1']['container'])

self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-2']
['container']['resources']['resourceCpuLimit'])
self.assertNotIn(
'memoryLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-2']['container']['resources'])

self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-3']
['container']['resources']['resourceMemoryLimit'])
self.assertNotIn(
'cpuLimit', dict_format['deploymentSpec']['executors']
['exec-return-1-3']['container']['resources'])

self.assertEqual(
2, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuRequest'])
'2', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuRequest'])
self.assertEqual(
5, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['cpuLimit'])
'5', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceCpuLimit'])
self.assertEqual(
4, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryRequest'])
'4G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryRequest'])
self.assertEqual(
50, dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['memoryLimit'])
'50G', dict_format['deploymentSpec']['executors']['exec-return-1-4']
['container']['resources']['resourceMemoryLimit'])


class TestPlatformConfig(unittest.TestCase):
Expand Down
49 changes: 39 additions & 10 deletions sdk/python/kfp/compiler/pipeline_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ def build_task_spec_for_task(
pipeline_task_spec.retry_policy.CopyFrom(
task._task_spec.retry_policy.to_proto())

# Inject resource fields into inputs
if task.container_spec and task.container_spec.resources:
for key, val in task.container_spec.resources.__dict__.items():
if val and pipeline_channel.extract_pipeline_channels_from_any(val):
task.inputs[key] = val

for input_name, input_value in task.inputs.items():
# Since LoopParameterArgument and LoopArtifactArgument and LoopArgumentVariable are narrower
# types than PipelineParameterChannel, start with them.
Expand Down Expand Up @@ -607,6 +613,24 @@ def build_container_spec_for_task(
Returns:
A PipelineContainerSpec object for the task.
"""

def convert_to_placeholder(input_value: str) -> str:
"""Checks if input is a pipeline channel and if so, converts to
compiler injected input name."""
pipeline_channels = (
pipeline_channel.extract_pipeline_channels_from_any(input_value))
if pipeline_channels:
assert len(pipeline_channels) == 1
channel = pipeline_channels[0]
additional_input_name = (
compiler_utils.additional_input_name_for_pipeline_channel(
channel))
additional_input_placeholder = placeholders.InputValuePlaceholder(
additional_input_name)._to_string()
input_value = input_value.replace(channel.pattern,
additional_input_placeholder)
return input_value

container_spec = (
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec(
image=task.container_spec.image,
Expand All @@ -620,23 +644,28 @@ def build_container_spec_for_task(

if task.container_spec.resources is not None:
if task.container_spec.resources.cpu_request is not None:
container_spec.resources.cpu_request = (
task.container_spec.resources.cpu_request)
container_spec.resources.resource_cpu_request = (
convert_to_placeholder(
task.container_spec.resources.cpu_request))
if task.container_spec.resources.cpu_limit is not None:
container_spec.resources.cpu_limit = (
task.container_spec.resources.cpu_limit)
container_spec.resources.resource_cpu_limit = (
convert_to_placeholder(task.container_spec.resources.cpu_limit))
if task.container_spec.resources.memory_request is not None:
container_spec.resources.memory_request = (
task.container_spec.resources.memory_request)
container_spec.resources.resource_memory_request = (
convert_to_placeholder(
task.container_spec.resources.memory_request))
if task.container_spec.resources.memory_limit is not None:
container_spec.resources.memory_limit = (
task.container_spec.resources.memory_limit)
container_spec.resources.resource_memory_limit = (
convert_to_placeholder(
task.container_spec.resources.memory_limit))
if task.container_spec.resources.accelerator_count is not None:
container_spec.resources.accelerator.CopyFrom(
pipeline_spec_pb2.PipelineDeploymentConfig.PipelineContainerSpec
.ResourceSpec.AcceleratorConfig(
type=task.container_spec.resources.accelerator_type,
count=task.container_spec.resources.accelerator_count,
resource_type=convert_to_placeholder(
task.container_spec.resources.accelerator_type),
resource_count=convert_to_placeholder(
task.container_spec.resources.accelerator_count),
))

return container_spec
Expand Down
116 changes: 57 additions & 59 deletions sdk/python/kfp/dsl/pipeline_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,9 @@ def _ensure_container_spec_exists(self) -> None:
f'{caller_method_name} can only be used on single-step components, not pipelines used as components, or special components like importers.'
)

def _validate_cpu_request_limit(self, cpu: str) -> float:
def _validate_cpu_request_limit(self, cpu: str) -> str:
"""Validates cpu request/limit string and converts to its numeric
value.
string value.
Args:
cpu: CPU requests or limits. This string should be a number or a
Expand All @@ -335,17 +335,22 @@ def _validate_cpu_request_limit(self, cpu: str) -> float:
ValueError if the cpu request/limit string value is invalid.
Returns:
The numeric value (float) of the cpu request/limit.
The numeric string of the cpu request/limit.
"""
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)
if isinstance(cpu, pipeline_channel.PipelineChannel):
cpu = str(cpu)
else:
if re.match(r'([0-9]*[.])?[0-9]+m?$', cpu) is None:
raise ValueError(
'Invalid cpu string. Should be float or integer, or integer'
' followed by "m".')
return cpu

@block_if_final()
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
def set_cpu_request(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.
Args:
Expand All @@ -370,7 +375,10 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
def set_cpu_limit(
self,
cpu: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
Args:
Expand All @@ -395,7 +403,9 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
return self

@block_if_final()
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
def set_accelerator_limit(
self, limit: Union[int, str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets accelerator limit (maximum) for the task. Only applies if
accelerator type is also set via .set_accelerator_type().
Expand All @@ -406,11 +416,15 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()

if isinstance(limit, str):
if re.match(r'[1-9]\d*$', limit) is None:
raise ValueError(f'{"limit"!r} must be positive integer.')
limit = int(limit)
if isinstance(limit, pipeline_channel.PipelineChannel):
limit = str(limit)
else:
if isinstance(limit, int):
limit = str(limit)
if isinstance(limit, str) and re.match(r'^0$|^1$|^2$|^4$|^8$|^16$',
limit) is None:
raise ValueError(
f'{"limit"!r} must be one of 0, 1, 2, 4, 8, 16.')

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_count = limit
Expand Down Expand Up @@ -438,9 +452,9 @@ def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_limit(gpu)

def _validate_memory_request_limit(self, memory: str) -> float:
def _validate_memory_request_limit(self, memory: str) -> str:
"""Validates memory request/limit string and converts to its numeric
value.
string value.
Args:
memory: Memory requests or limits. This string should be a number or
Expand All @@ -451,47 +465,24 @@ def _validate_memory_request_limit(self, memory: str) -> float:
ValueError if the memory request/limit string value is invalid.
Returns:
The numeric value (float) of the memory request/limit.
The numeric string value of the memory request/limit.
"""
if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')

if memory.endswith('E'):
memory = float(memory[:-1]) * constants._E / constants._G
elif memory.endswith('Ei'):
memory = float(memory[:-2]) * constants._EI / constants._G
elif memory.endswith('P'):
memory = float(memory[:-1]) * constants._P / constants._G
elif memory.endswith('Pi'):
memory = float(memory[:-2]) * constants._PI / constants._G
elif memory.endswith('T'):
memory = float(memory[:-1]) * constants._T / constants._G
elif memory.endswith('Ti'):
memory = float(memory[:-2]) * constants._TI / constants._G
elif memory.endswith('G'):
memory = float(memory[:-1])
elif memory.endswith('Gi'):
memory = float(memory[:-2]) * constants._GI / constants._G
elif memory.endswith('M'):
memory = float(memory[:-1]) * constants._M / constants._G
elif memory.endswith('Mi'):
memory = float(memory[:-2]) * constants._MI / constants._G
elif memory.endswith('K'):
memory = float(memory[:-1]) * constants._K / constants._G
elif memory.endswith('Ki'):
memory = float(memory[:-2]) * constants._KI / constants._G
if isinstance(memory, pipeline_channel.PipelineChannel):
memory = str(memory)
else:
# By default interpret as a plain integer, in the unit of Bytes.
memory = float(memory) / constants._G

if re.match(r'^[0-9]+(E|Ei|P|Pi|T|Ti|G|Gi|M|Mi|K|Ki){0,1}$',
memory) is None:
raise ValueError(
'Invalid memory string. Should be a number or a number '
'followed by one of "E", "Ei", "P", "Pi", "T", "Ti", "G", '
'"Gi", "M", "Mi", "K", "Ki".')
return memory

@block_if_final()
def set_memory_request(self, memory: str) -> 'PipelineTask':
def set_memory_request(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.
Args:
Expand All @@ -515,7 +506,10 @@ def set_memory_request(self, memory: str) -> 'PipelineTask':
return self

@block_if_final()
def set_memory_limit(self, memory: str) -> 'PipelineTask':
def set_memory_limit(
self,
memory: Union[str,
pipeline_channel.PipelineChannel]) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
Args:
Expand Down Expand Up @@ -579,7 +573,9 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
return self.set_accelerator_type(accelerator)

@block_if_final()
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
def set_accelerator_type(
self, accelerator: Union[str, pipeline_channel.PipelineChannel]
) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Args:
Expand All @@ -589,14 +585,16 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
Self return to allow chained setting calls.
"""
self._ensure_container_spec_exists()
if isinstance(accelerator, pipeline_channel.PipelineChannel):
accelerator = str(accelerator)

if self.container_spec.resources is not None:
self.container_spec.resources.accelerator_type = accelerator
if self.container_spec.resources.accelerator_count is None:
self.container_spec.resources.accelerator_count = 1
self.container_spec.resources.accelerator_count = '1'
else:
self.container_spec.resources = structures.ResourceSpec(
accelerator_count=1, accelerator_type=accelerator)
accelerator_count='1', accelerator_type=accelerator)

return self

Expand Down
Loading

0 comments on commit 94b1a0d

Please sign in to comment.