Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement DaskJob resource #483

Closed
jacobtomlinson opened this issue May 13, 2022 · 3 comments · Fixed by #504
Closed

Implement DaskJob resource #483

jacobtomlinson opened this issue May 13, 2022 · 3 comments · Fixed by #504

Comments

@jacobtomlinson
Copy link
Member

jacobtomlinson commented May 13, 2022

In KubeFlow there are many operators that allow users to run training jobs as a one-shot task. For example, PyTorch has a PyTorchJob CRD which creates a cluster of pods to run a training task and runs them to completion, then cleans up when it is done.

We should add something similar to the operator here so that folks will have familiar tools. We can reuse the DaskCluster CRD within the DaskJob (nesting this will be trivial thanks to the work by @samdyzon in #452).

I've thought about a few approaches (see alternatives in the details below) but this is my preferred option:

  • User creates DaskJob with nested Job spec and DaskCluster spec.
  • Operator creates Job resource that runs the client code and a DaskCluster resource that will be leveraged by the client code.
  • When the Job creates its Pod (this is done by the kubelet) the operator adopts the DaskCluster to the Pod so that it will be cascade deleted on completion of the Job.

This approach will only support non-parallel jobs.

Alternative approaches that I have discounted

One way to implement this would be to reuse as many existing resources and behaviours as possible. But it would require access to the Jobs API and for the client code to be resilient to waiting for the cluster to start.

  • User creates DaskJob with nested Job spec and DaskCluster spec.
  • Operator creates Job resource that runs the client code.
  • When the Job creates a Pod the operator creates a DaskCluster resource and adopts it to the Pod.
  • When the Job finishes and the Pod is removed the DaskCluster will be cascade deleted automatically.

We could also create the DaskCluster and Job at the same time or the DaskCluster first then the Job.

Alternatively, we could reimplement some of what a Job does in the operator which would give us a little more control over the startup order and require less of the API.

  • User creates DaskJob with nested Pod spec and DaskCluster spec.
  • Operator creates the DaskCluster resource.
  • When the cluster is running the operator creates a Pod from the spec that runs the client code.
  • The operator polls the Pod to wait for it to finish.
  • When the Pod is done the Pod and DaskCluster resources are deleted.

We also probably want to support a nested autoscaler resource in the DaskJob too, so will need #451 to be merged.

@bstadlbauer
Copy link
Collaborator

Hi @jacobtomlinson!

Upfront, thanks for the great work on the operator, that should make life easier on multiple ends! I've already played around with it a bit and so far things are looking really good 👍 For a bit of context, I am currently working on integrating dask into Flyte by providing a Dask Flyte-task which manages the ephemeral dask cluster lifecycle. There are already similar plugins for Spark, etc.

I've already managed to create a flytekit (i.e. a pure Python) plugin (code is not public yet) utilizes experimental.KubeCluster to spin up and then delete the dask cluster. It would be really nice to convert this to a backend (Go) plugin (which would create the required CRD), but this would require the DaskJob resource.

So TL;DR: If there is anything I can help with on this issue, just let me know

@jacobtomlinson
Copy link
Member Author

Glad the recent work is useful to you! I'm excited to have folks trying it out. Implementing DaskJob is on my tasklist for this week, although I do want to try and land #451 first as I also want to nest the autoscaling resource within DaskJob so that it can be adaptive.

I don't know anything about Flyte but I'm curious what features you need from the DaskJob CRD that you can't get from the DaskCluster CRD? Does it specifically need a Job style resource rather than a Deployment style resource?

@bstadlbauer
Copy link
Collaborator

Sound good! If we see any errors, we'll try to open PRs whenever we can 👍

I am by no means an expert on backend Flyte plugins, but the way these work is that they construct a K8s resource (of any kind), submit that resource to K8s and then watch it's state. The go interface a plugin needs to implement is here, an example implementation (a Spark plugin) can be found here. The spark plugin uses the spark operator, and submits a SparkApplication CRD, which (as I understand), contains both the cluster specification, as well as the job that needs to run.

This is the corresponding dask related issue in Flyte flyteorg/flyte#427

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants