Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backend][Plugin]Support for Dask clustered tasks in Flyte #427

Closed
1 of 6 tasks
kumare3 opened this issue Jul 22, 2020 · 13 comments
Closed
1 of 6 tasks

[Backend][Plugin]Support for Dask clustered tasks in Flyte #427

kumare3 opened this issue Jul 22, 2020 · 13 comments
Labels
enhancement New feature or request help wanted Extra attention is needed
Milestone

Comments

@kumare3
Copy link
Contributor

kumare3 commented Jul 22, 2020

Why would this plugin be helpful to the Flyte community
Users could write very short running distributed array jobs using DASK. This makes it possible to have very small runtime jobs multi-plexed onto same set of nodes.

Type of Plugin

  • Web Service (e.g. AWS Sagemaker, GCP DataFlow, Qubole etc...)
  • Kubernetes Operator (e.g. TfOperator, SparkOperator, FlinkK8sOperator, etc...)
  • Customized Plugin using native kubernetes constructs
  • Other

Can you help us with the implementation?

  • Yes
  • No

Additional context
This would really help express some ideas that are not Spark, or heavyweight like Flyte batch jobs.

@kumare3 kumare3 added untriaged This issues has not yet been looked at by the Maintainers enhancement New feature or request help wanted Extra attention is needed and removed untriaged This issues has not yet been looked at by the Maintainers labels Jul 22, 2020
@kumare3 kumare3 changed the title [Backend][Plugin]Support for DaskOperator in Flyte [Backend][Plugin]Support for Dask clustered tasks in Flyte Dec 27, 2021
@kumare3
Copy link
Contributor Author

kumare3 commented Dec 27, 2021

Dask can be deployed to Kubernetes, the template is shown here. Allowing this would help the users a lot and enable writing really short tasks. This coupled with cluster re-use (coming later) or cluster gateways (daskhub) and support for a coiled task in the future would enable users to use dask more effectively and make Flyte + Dask work together.

@task(config=Dask(
            workers=4,
            worker_resources=....,
            [worker-pod-template=...] # Also the command should probably be hard coded client side?
            ), resources=Resources(....) # Driver resource
)
def my_dask_program():
      pass

@ossareh
Copy link
Contributor

ossareh commented Mar 31, 2022

This was discussed a little on slack: https://app.slack.com/client/TN89P6GGK/CNMKCU6FR/thread/CNMKCU6FR-1648660418.322249

Currently we do not use dask, nor do we use Coiled's hosted platform for Dask. However, both are really interesting to us in terms of migrating away from our current, home rolled, workflow orchestration solution and having someone else run our work loads for us.

The primary interest in Dask is its drop in nature w.r.t. dataframes and numpy arrays. We currently employ a streaming solution (using mmap) to allow us to do this work on one node; being able to scale up to multiple nodes without needing code changes in dependent areas of our project would be an instant win for us.

From an integration point of view I found the Dask+Prefect video informative:

In this video a couple of things stand out to me as desirable from any integration:

  • Ability to use local dask; i.e. development work flow - we need this
  • Ability to target a remote dask cluster; i.e. your suggestion above
  • Ability to use the coiled cluster as a remote cluster with setup handled by the coiled package; what I'm really interested in!

Unfortunately I don't have much more to add other than opinions currently; however as we re-work our stack to work with flyte there's a high chance we find ourselves going down this path - in which case we'll keep you apprised.

@kumare3
Copy link
Contributor Author

kumare3 commented Apr 1, 2022

This is a great summary, let me chalk out the effort and see when we can accommodate this

@kumare3
Copy link
Contributor Author

kumare3 commented Apr 1, 2022

Also we can always start with a flytekit plugin,
Which should be simple

@kumare3 kumare3 modified the milestones: 1.1.0 - Hawk, 1.0.1 Apr 1, 2022
@bstadlbauer
Copy link
Member

@kumare3 Quick update on this, we are working on a flytekit plugin for this and have a working prototype. Will test it out in the upcoming week(s) and if we see it's working we will open a PR to add this.

I've also looked into creating a backend plugin and have a working prototype, capable of managing the cluster lifecycle. Currently, this is waiting on dask/dask-kubernetes#483 (Basically job CRD, similar to the SparkApplication CRD).

@kumare3
Copy link
Contributor Author

kumare3 commented May 24, 2022

@bstadlbauer this is awesome. Please let us know how we can help. There is some momentum now in adding Flyte+Ray support. We will also be working on reusing Ray cluster across multiple tasks in a Flyte workflow. Once you have your dask plugin, we will start modifying things towards this common way of reusing clusters

@bstadlbauer
Copy link
Member

@kumare3 Great, thank you! Resuing clusters would be super helpful!

I've looked at the spark plugin as an example, and wanted to confirm something:
As far as I understood it, the plugin will watch one K8s object, requiring that this object also has to include the client code (e.g. a pod interacting with the cluster), right? So for the spark case, this would be the SparkApplication CRD, which submits the spark job.
Asking because at the moment, the dask operator only offers to setup the cluster (scheduler+workers) through a CRD, but does not run any user code yet. This will be added in dask/dask-kubernetes#483, but that would mean the Flyte backend plugin will have to wait on that to be ready, right?

@kumare3
Copy link
Contributor Author

kumare3 commented May 25, 2022

@bstadlbauer tye backend plugin is flexible. Spark is peculiar because it starts the cluster and runs the app. We actually prefer that you can run a separate driver as that can speed up Flyte even more and give fantastic control- learnt through many issues in spark.

Flyte can run the user code as a separate pod and then monitor it. This also helps on reuse

@bstadlbauer
Copy link
Member

@kumare3 Oh that's nice! Is there a plugin that does this already?

@kumare3
Copy link
Contributor Author

kumare3 commented May 25, 2022

Not today, but we are working on ray plugin. Let me add you to a slack thread

@bstadlbauer
Copy link
Member

Quick update:

  • I've started a small repo creating a Go client for the k8s dask operator
  • I've started adding a flyteplugins branch which adds the plugin. At the moment, there is a very rough draft which uses the DaskJob resource (similar to the SparkApplication resource) to deploy the cluster and run the code
  • I've started a flytekit branch, which at the moment only has a minimal skeleton which enables testing the created backend plugin

@hamersaw hamersaw modified the milestones: 1.0.1, 1.2.0 Jul 21, 2022
@eapolinario eapolinario modified the milestones: 1.2.0, 1.3.0 Sep 19, 2022
eapolinario pushed a commit to eapolinario/flyte that referenced this issue Dec 6, 2022
Signed-off-by: Ketan Umare <[email protected]>
@bstadlbauer
Copy link
Member

bstadlbauer commented Dec 12, 2022

Quick update from my end. I had some time this weekend to finish things. Sorry for this taking so long, the last weeks have been quite busy.

Overall, this would be the order in which the PRs need to go in:

  1. Add inital dask plugin IDL #minor flyteidl#339
  2. Add dask plugin #patch flyteplugins#275
  3. Add dask plugin flytepropeller#508
  4. Add dask plugin #patch flytekit#1366
  5. Add dask operator #3145
  6. Dask plugin docs flytesnacks#929

@kumare3 @hamersaw

eapolinario pushed a commit to eapolinario/flyte that referenced this issue Dec 20, 2022
@cosmicBboy cosmicBboy modified the milestones: 1.3.0, 2023 Q1 Backlog Jan 25, 2023
@cosmicBboy
Copy link
Contributor

All PRs are in, closing this task. Thanks again for this awesome contribution @bstadlbauer ! 🚀

eapolinario pushed a commit to eapolinario/flyte that referenced this issue Aug 9, 2023
Signed-off-by: Ketan Umare <[email protected]>
eapolinario pushed a commit to eapolinario/flyte that referenced this issue Aug 21, 2023
eapolinario pushed a commit that referenced this issue Sep 8, 2023
eapolinario pushed a commit that referenced this issue Sep 13, 2023
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Eduardo Apolinario <[email protected]>
eapolinario pushed a commit that referenced this issue Sep 26, 2023
* add field

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Pass task execution metadata from agent (#422)

* Pass task execution metadata from agent

Signed-off-by: Hongxin Liang <[email protected]>

* Add doc

Signed-off-by: Hongxin Liang <[email protected]>

* Update protos/flyteidl/admin/agent.proto

Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: Honnix <[email protected]>

* Regenerate

---------

Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Honnix <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Add tags to execution spec (#414)

* add tags to execution spec

Signed-off-by: Kevin Su <[email protected]>

* add tags to execution spec

Signed-off-by: Kevin Su <[email protected]>

* add comment

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Correct comment for array job max parallelism (#431)

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Add the scalar to the operand (#427)

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* add selector

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* move selectors from container to task metadata

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* drop only_preferred

Signed-off-by: Jeev B <[email protected]>

* Updating boilerplate to lock golangci-lint version (#435)

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* add unpartitioned selector

Signed-off-by: Jeev B <[email protected]>

* refactor

Signed-off-by: Jeev B <[email protected]>

* refactor

Signed-off-by: Jeev B <[email protected]>

* fix oneof names

Signed-off-by: Jeev B <[email protected]>

* add build.os for read the docs

Signed-off-by: Jeev B <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>
Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Honnix <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Honnix <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Katrina Rogan <[email protected]>
Co-authored-by: Jeev B <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
pvditt pushed a commit that referenced this issue Dec 29, 2023
pvditt pushed a commit that referenced this issue Dec 29, 2023
* add field

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Pass task execution metadata from agent (#422)

* Pass task execution metadata from agent

Signed-off-by: Hongxin Liang <[email protected]>

* Add doc

Signed-off-by: Hongxin Liang <[email protected]>

* Update protos/flyteidl/admin/agent.proto

Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: Honnix <[email protected]>

* Regenerate

---------

Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Honnix <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Add tags to execution spec (#414)

* add tags to execution spec

Signed-off-by: Kevin Su <[email protected]>

* add tags to execution spec

Signed-off-by: Kevin Su <[email protected]>

* add comment

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Correct comment for array job max parallelism (#431)

Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* Add the scalar to the operand (#427)

Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* add selector

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* move selectors from container to task metadata

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* drop only_preferred

Signed-off-by: Jeev B <[email protected]>

* Updating boilerplate to lock golangci-lint version (#435)

Signed-off-by: Daniel Rammer <[email protected]>
Signed-off-by: Jeev B <[email protected]>

* add unpartitioned selector

Signed-off-by: Jeev B <[email protected]>

* refactor

Signed-off-by: Jeev B <[email protected]>

* refactor

Signed-off-by: Jeev B <[email protected]>

* fix oneof names

Signed-off-by: Jeev B <[email protected]>

* add build.os for read the docs

Signed-off-by: Jeev B <[email protected]>

---------

Signed-off-by: Yee Hing Tong <[email protected]>
Signed-off-by: Jeev B <[email protected]>
Signed-off-by: Hongxin Liang <[email protected]>
Signed-off-by: Honnix <[email protected]>
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Katrina Rogan <[email protected]>
Signed-off-by: Daniel Rammer <[email protected]>
Co-authored-by: Honnix <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Kevin Su <[email protected]>
Co-authored-by: Katrina Rogan <[email protected]>
Co-authored-by: Jeev B <[email protected]>
Co-authored-by: Dan Rammer <[email protected]>
eapolinario pushed a commit to eapolinario/flyte that referenced this issue Apr 30, 2024
eapolinario pushed a commit to eapolinario/flyte that referenced this issue Apr 30, 2024
austin362667 pushed a commit to austin362667/flyte that referenced this issue May 7, 2024
robert-ulbrich-mercedes-benz pushed a commit to robert-ulbrich-mercedes-benz/flyte that referenced this issue Jul 2, 2024
pvditt added a commit that referenced this issue Aug 21, 2024
* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
pvditt added a commit that referenced this issue Aug 21, 2024
* [BUG] add retries to handle array node eventing race condition (#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
eapolinario pushed a commit that referenced this issue Aug 23, 2024
* [BUG] add retries to handle array node eventing race condition (#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* use deep copy of bit arrays when getting array node state

Signed-off-by: Paul Dittamo <[email protected]>

* Revert "add deep copy for array node status"

This reverts commit dde7595.

Signed-off-by: Paul Dittamo <[email protected]>

* ignore ErrorOnAlreadyExists when  marshalling event config

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
pmahindrakar-oss pushed a commit that referenced this issue Sep 9, 2024
* [BUG] add retries to handle array node eventing race condition (#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
pmahindrakar-oss pushed a commit that referenced this issue Sep 9, 2024
* [BUG] add retries to handle array node eventing race condition (#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* use deep copy of bit arrays when getting array node state

Signed-off-by: Paul Dittamo <[email protected]>

* Revert "add deep copy for array node status"

This reverts commit dde7595.

Signed-off-by: Paul Dittamo <[email protected]>

* ignore ErrorOnAlreadyExists when  marshalling event config

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: pmahindrakar-oss <[email protected]>
bgedik pushed a commit to bgedik/flyte that referenced this issue Sep 12, 2024
* [BUG] add retries to handle array node eventing race condition (flyteorg#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (flyteorg#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (flyteorg#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Bugra Gedik <[email protected]>
bgedik pushed a commit to bgedik/flyte that referenced this issue Sep 12, 2024
…eorg#5681)

* [BUG] add retries to handle array node eventing race condition (flyteorg#421)

If there is an error updating a [FlyteWorkflow CRD](https://github.com/unionai/flyte/blob/6a7207c5345604a28a9d4e3699becff767f520f5/flytepropeller/pkg/controller/handler.go#L378), then the propeller streak ends without the CRD getting updated and the in-memory copy of the FlyteWorkflow is not utilized on the next loop.

[TaskPhaseVersion](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/node_status.go#L239) is stored in the FlyteWorkflow. This is incremented when there is an update to node/subnode state to ensure that events are unique. If the events stay in the same state and have the same TaskPhaseVersion, then they [get short-circuited and don't get emitted to admin](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/events/admin_eventsink.go#L59) or will get returned as an [AlreadyExists error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flyteadmin/pkg/manager/impl/task_execution_manager.go#L172) and get [handled in propeller to not bubble up in an error](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/nodes/node_exec_context.go#L38).

We can run into issues with ArrayNode eventing when:
- array node handler increments task phase version from "0" to "1"
- admin event sink emits event with version "1"
- the propeller controller is not able to update the FlyteWorkflow CRD, so the ArrayNodeStatus indicates taskPhaseVersion is still 0
- next loop, array node handler increments task phase version from "0" to "1"
- admin event sink prevents the event from getting emitted as an event with the same ID has already been received. No error is bubbled up.

This means we lose subnode state until there is an event that contains an update to that subnode. If the lost state is the subnode reaching a terminal state, then the subnode state (from admin/UI) is "stuck" in a non-terminal state.

I confirmed this to be an issue in the load-test-cluster. Whenever, there was an [error syncing the FlyteWorkflow](https://github.com/flyteorg/flyte/blob/37b4e13ac4a3594ac63b7a35058f4b2220e51282/flytepropeller/pkg/controller/workers.go#L91), the next round of eventing in ArrayNode would fail unless the ArrayNode phase changed.

- added unit test
- tested locally in sandbox
- test in dogfood - https://buildkite.com/unionai/managed-cluster-staging-sync/builds/4398#01914a1a-f6d6-42a5-b41b-7b6807f27370

- should be fine to rollout to prod

Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [x] To be upstreamed to OSS

fixes: https://linear.app/unionai/issue/COR-1534/bug-arraynode-shows-non-complete-jobs-in-ui-when-the-job-is-actually

* [x] Added tests
* [x] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation

Signed-off-by: Paul Dittamo <[email protected]>

* handle already exists error on array node abort (flyteorg#427)

* handle already exists error on array node abort

Signed-off-by: Paul Dittamo <[email protected]>

* update comment

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* [BUG] set cause for already exists EventError (flyteorg#432)

* set cause for already exists EventError

Signed-off-by: Paul Dittamo <[email protected]>

* add nil check event error

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* add deep copy for array node status

Signed-off-by: Paul Dittamo <[email protected]>

* use deep copy of bit arrays when getting array node state

Signed-off-by: Paul Dittamo <[email protected]>

* Revert "add deep copy for array node status"

This reverts commit dde7595.

Signed-off-by: Paul Dittamo <[email protected]>

* ignore ErrorOnAlreadyExists when  marshalling event config

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
Signed-off-by: Bugra Gedik <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

6 participants