Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add dask plugin #patch #275

Merged
merged 32 commits into from
Jan 5, 2023
Merged

Conversation

bstadlbauer
Copy link
Member

@bstadlbauer bstadlbauer commented Jul 7, 2022

TL;DR

This PR adds a backend dask plugin using the dask-kubernetes operator to manage the dask cluster lifecycle.

Type

  • Bug Fix
  • Feature
  • Plugin

Are all requirements met?

  • Code completed
  • Smoke tested
  • Unit tests added
  • Code documentation added
  • Any pending items have an associated Issue

Complete description

The plugin works similar to the already existing spark plugin. It uses the DaskJob (docs) Custom Resource as a client pod which connects to the spun up cluster. This is similar to how the SparkApplication Custom Resource works for the spark plugin.

TODO before this can be merged:

Tracking Issue

flyteorg/flyte#427

Follow-up issue

NA

@welcome
Copy link

welcome bot commented Jul 7, 2022

Thank you for opening this pull request! 🙌

These tips will help get your PR across the finish line:

  • Most of the repos have a PR template; if not, fill it out to the best of your knowledge.
  • Sign off your commits (Reference: DCO Guide).

@codecov
Copy link

codecov bot commented Jul 7, 2022

Codecov Report

Merging #275 (fa0dfa0) into master (9eeb694) will increase coverage by 0.64%.
The diff coverage is 86.14%.

@@            Coverage Diff             @@
##           master     #275      +/-   ##
==========================================
+ Coverage   62.37%   63.02%   +0.64%     
==========================================
  Files         147      148       +1     
  Lines       11816    12148     +332     
==========================================
+ Hits         7370     7656     +286     
- Misses       3882     3912      +30     
- Partials      564      580      +16     
Flag Coverage Δ
unittests 63.02% <86.14%> (+0.64%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
go/tasks/plugins/k8s/dask/dask.go 83.91% <83.91%> (ø)
go/tasks/pluginmachinery/flytek8s/utils.go 100.00% <100.00%> (ø)

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

Copy link
Member Author

@bstadlbauer bstadlbauer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @hamersaw @eapolinario @pingsutw!

This is a first draft of the dask backend plugin. I mostly wanted to share progress with you here and would have one small question around it.
It already runs successfully, but the code is still lacking:

  • Configuration options (currently everything is hardcoded) -> A PR for the Python API should come soon in flytekit, this would probably be the best way to discuss the interface?
  • Any sort of testing

Comment on lines 268 to 281
func getJobPodFromJobResource(job *daskAPI.DaskJob, ctx context.Context) (*v1.Pod, error) {
clientset, err := getClientset()
if err != nil {
return nil, err
}

jobPodName := job.ObjectMeta.Name + jobRunnerPodPostfix
jobPodNamespace := job.ObjectMeta.Namespace
pod, err := clientset.CoreV1().Pods(jobPodNamespace).Get(ctx, jobPodName, metav1.GetOptions{})
if err != nil {
return nil, err
}
return pod, nil
}
Copy link
Member Author

@bstadlbauer bstadlbauer Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, the DaskJob CR has no notion of a state. So to get the state, I am currently instantiating a k8s clientset, query for the job pod, and use that to determine state, similar to how the pod plugin does that.

My questions here would be:

  1. Is this something that is acceptable? From what I've seen, usually only propeller has a notion of clientset, other plugins rely on the state within the CR provided by to the plugin by the propeller. If this is not an option, I can try to contribute to the dask-operator, potentially we could save some state in there.
  2. If yes, should there be configuration which allows authenticating using a local kubeconfig, or should we only support in-cluster mode?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally plugins should not create a new k8s config. This context is better kept within propeller and injected in.

Is it that you want to get the reference to the driver pod? We can do that in a different way

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kumare3 Exactly, this would only be used to get the driver pod (JobPod in the dask terminology). The Task Phase would then be based on the state of this pod (similar to how the pod plugin determines the task phase).

Happy to also do this a different way 👍

go/tasks/plugins/k8s/dask/dask.go Outdated Show resolved Hide resolved
@hamersaw
Copy link
Contributor

This is awesome! Thanks for working on it, I see it's been a bit of effort in different repos as well. One question, in doing some reading on the dask operator here it sounds like once the DaskJob CRD is created the control is handed over the the Job Runner Pod (hope terminology is correct). As much so that "Once the job Pod runs to completion the cluster is removed automatically". This is why you are forced to retrieve the JobPod to gather the status of the job, because, although a bit unusual for CRDs, the status is not tracked in the top level. Does aborting the JobPod shutdown the cluster as well? If so, FlytePropeller uses the BuildIdentityResource function for checking status, aborting, and finalizing the task - maybe we could set this to the JobPod rather than the DaskJob?

@bstadlbauer bstadlbauer force-pushed the add-dask-plugin branch 2 times, most recently from d4b5a00 to dfe92b6 Compare September 10, 2022 06:47
@bstadlbauer
Copy link
Member Author

Hi @hamersaw!

Sorry for the slow progress on this, but last time I've tried to continue I ran into weird issues with my Flyte dev setup on my private machine (flytectl would segfault upon registering). I have found a workaround now and have had a closer look at your proposal.
I've tried switching over the IdentityResource from being the DaskJob CRD to being the Job Runner Pod, however, I saw two problems with this approach:

  1. The resource's name is set to the node name in propeller when adding the objects metadata (here). However, the dask operator will be appending "-runner" to the that name (here) when creating the Job Runner Pod (which is not configurable at the moment).
  2. When the plugin first creates a DaskJob CRD in BuildResouce() but then uses the Job Runner Pod as Identity Resource, we could potentially get into the state where the CRD is created, but the dask operator has not created the pod yet, thus leading to an error in propeller as it won't be able to find the Identity Resource, right?

I might be missing something here though? Otherwise, I would propose that I try to add state to the CRD on the dask side, so that we can then do something similar to the other k8s plugins?

@bstadlbauer bstadlbauer changed the title Add dask plugin #minor WIP: Add dask plugin #minor Sep 11, 2022
@bstadlbauer bstadlbauer changed the title WIP: Add dask plugin #minor Add dask plugin #minor Nov 14, 2022
@bstadlbauer bstadlbauer marked this pull request as ready for review November 14, 2022 14:35
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only big thing I notice is figuring out resources. I don't like that there is parsing (ie. protobuf to k8s) specific code here. It looks like this functionality has been implemented in FlytePropeller, which unfortunately cannot be included as a dependency here.

IIUC this uses the task-defined resources as defaults for everything and then allows overriding through the configuration. I think this a sane way of doing it - but should certainly be documented. I think the two options are:
(1) refactor flytepropeller to move the code to flyteplugins
(2) use the k8s version resources as part of the configuration. i think we would have to unmarshal them similar to how we do podspecs in the pod plugin. honestly, i do not know everything about what this would look like in flytekit, but presumably we could then use the k8s Resource objects are part of the flytekit definition as well.

I'm not sure what is a better route here. Thoughts?

go/tasks/plugins/k8s/dask/dask.go Outdated Show resolved Hide resolved
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
@bstadlbauer
Copy link
Member Author

@hamersaw Good point about the resource parsing, I've refactored this out of flytepropeller into flyteplugins in flyteorg/flytepropeller@fc99a5d and 6be8764 respectively

Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Signed-off-by: Bernhard Stadlbauer <[email protected]>
Copy link
Contributor

@hamersaw hamersaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we get the FlyteIDL 1.4.0 -> 1.3.x fixed lets go ahead and merge!

Signed-off-by: Bernhard Stadlbauer <[email protected]>
@bstadlbauer bstadlbauer changed the title Add dask plugin #minor Add dask plugin #patch Jan 5, 2023
Signed-off-by: Bernhard Stadlbauer <[email protected]>
@bstadlbauer bstadlbauer requested a review from hamersaw January 5, 2023 08:19
@bstadlbauer
Copy link
Member Author

@hamersaw I've reordered the imports and changed the flytekit version to 1.3.2. I've also removed the #minor from the PR title (in case that's what triggers the release)

@hamersaw hamersaw merged commit 36c0b41 into flyteorg:master Jan 5, 2023
@welcome
Copy link

welcome bot commented Jan 5, 2023

Congrats on merging your first pull request! 🎉

eapolinario pushed a commit that referenced this pull request Sep 6, 2023
* First working version with DaskCluster

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update plugin

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add container customization

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add correct `getTaskPhase`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Refactor dask.go

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Use new dask operator which includes a status

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add first tests and use data from flyteidl

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Refactor tests

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add support for custom namespace

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add support for passing on annotations

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add support for env vars

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add default container logic to job runner

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add TestGetTaskPhaseDask

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add logs to task info

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Use tagged version of dask go operator

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix linting issues

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Refactor `ToK8sResourceRequirements`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Use platform resources by default

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix incorrect resources

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Remove namespace

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Don't restart job runner and scheduler

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Run `go mod tidy` after rebase

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Run formatter

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update to new `flyteidl`

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Add support for interruptible workers

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update flytekit to 1.4.0

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Fix linting errors

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Update `flyteidl` to 1.3.2

Signed-off-by: Bernhard Stadlbauer <[email protected]>

* Reorder imports

Signed-off-by: Bernhard Stadlbauer <[email protected]>

Signed-off-by: Bernhard Stadlbauer <[email protected]>
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants