Skip to content

Commit

Permalink
Packages fasttask into a python wheel (#316)
Browse files Browse the repository at this point in the history
## Overview
This PR packages fast-task into a Python wheel, which bundles the executor and the bridge.

## Test Plan
I followed the devx in https://unionai.atlassian.net/wiki/spaces/ENG/pages/653721617/Fast+Task+DevX 

For https://unionai.atlassian.net/wiki/spaces/ENG/pages/653721617/Fast+Task+DevX#DevX-for-fasttask-Worker, the difference is:

```
pip install . -vv
unionai-bridge --queue-id=bar --fast-register-dir-override /tmp/fasttask-test
```

For https://unionai.atlassian.net/wiki/spaces/ENG/pages/653721617/Fast+Task+DevX#DevX-Execution-Environment-Service, you need to build an image with the wheel install. You can do this with the Dockerfile in this PR

```python
docker image build -t fasttask-builder:0.0.10 .

# Move to k3d
k3d image import --cluster=flyte fasttask-builder:0.0.10
```

Then update your workflow to use `container_image=fasttask-builder:0.0.10`

## Rollout Plan (if applicable)

1. Ship the new entrypoints with `unionai` 
2. Ask current customers to upgrade `unionai`.
    - If customers are okay with upgrading, ship the plugin code update in this PR.
    - If customers are not okay with upgrading, then make the plugin backward compatible. (Detect if `unionai-actor-bridge` is available. If not, switch back to using init container)

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed to OSS

## Issue
ref https://linear.app/unionai/issue/COR-983/install-fasttask-rust-worker-within-unionai-repo

## Checklist
* [ ] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
thomasjpfan authored Jun 20, 2024
1 parent 62727bd commit 4fe2fde
Show file tree
Hide file tree
Showing 35 changed files with 1,475 additions and 625 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,7 @@
"gopls": {
"formatting.local": "github.com/flyteorg"
},
"rust-analyzer.linkedProjects": [
"fasttask/worker/Cargo.toml"
]
}
4 changes: 2 additions & 2 deletions fasttask/buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ plugins:
- plugin: buf.build/grpc/go:v1.3.0
out: plugin
- plugin: buf.build/community/neoeinstein-prost:v0.2.3
out: worker/src/pb
out: worker/bridge/src/pb
- plugin: buf.build/community/neoeinstein-tonic:v0.3.0
out: worker/src/pb
out: worker/bridge/src/pb
40 changes: 1 addition & 39 deletions fasttask/plugin/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,43 +219,10 @@ func (i *InMemoryEnvBuilder) createPod(ctx context.Context, fastTaskEnvironmentS
}
objectMeta.Annotations[TTL_SECONDS] = fmt.Sprintf("%d", fastTaskEnvironmentSpec.GetTtlSeconds())

// create new volume 'workdir'
podSpec.Volumes = append(podSpec.Volumes, v1.Volume{
Name: "workdir",
VolumeSource: v1.VolumeSource{
EmptyDir: &v1.EmptyDirVolumeSource{},
},
})

// add init container to copy worker binary to volume
podSpec.InitContainers = append(podSpec.InitContainers, v1.Container{
Name: "worker",
Image: GetConfig().Image,
ImagePullPolicy: v1.PullIfNotPresent,
Command: []string{"cp", "/usr/local/bin/worker", "/tmp/worker"},
Resources: v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: GetConfig().InitContainerCPU,
v1.ResourceMemory: GetConfig().InitContainerMemory,
},
Requests: v1.ResourceList{
v1.ResourceCPU: GetConfig().InitContainerCPU,
v1.ResourceMemory: GetConfig().InitContainerMemory,
},
},
VolumeMounts: []v1.VolumeMount{
{
Name: "workdir",
MountPath: "/tmp",
},
},
})

// update primary container arguments and volume mounts
container := &podSpec.Containers[primaryContainerIndex]
container.Args = []string{
"/tmp/worker",
"bridge",
"unionai-actor-bridge",
}

// append additional worker args before plugin args to ensure they are overridden
Expand All @@ -275,11 +242,6 @@ func (i *InMemoryEnvBuilder) createPod(ctx context.Context, fastTaskEnvironmentS
GetConfig().CallbackURI,
)

container.VolumeMounts = append(container.VolumeMounts, v1.VolumeMount{
Name: "workdir",
MountPath: "/tmp",
})

// use kubeclient to create worker
return i.kubeClient.GetClient().Create(ctx, &v1.Pod{
ObjectMeta: *objectMeta,
Expand Down
32 changes: 11 additions & 21 deletions fasttask/plugin/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,13 @@ package plugin
import (
"time"

"k8s.io/apimachinery/pkg/api/resource"

pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flytestdlib/config"
)

//go:generate pflags Config --default-var=defaultConfig

var (
defaultCPU = resource.MustParse("500m")
defaultMemory = resource.MustParse("128Mi")
defaultConfig = &Config{
CallbackURI: "http://host.k3d.internal:15605",
Endpoint: "0.0.0.0:15605",
Expand All @@ -23,9 +19,6 @@ var (
GracePeriodStatusNotFound: config.Duration{Duration: time.Second * 90},
GracePeriodWorkersUnavailable: config.Duration{Duration: time.Second * 30},
HeartbeatBufferSize: 512,
Image: "flyteorg/fasttask:latest",
InitContainerCPU: defaultCPU,
InitContainerMemory: defaultMemory,
NonceLength: 12,
TaskStatusBufferSize: 512,
AdditionalWorkerArgs: []string{},
Expand All @@ -35,20 +28,17 @@ var (
)

type Config struct {
CallbackURI string `json:"callback-uri" pflag:",Fasttask gRPC service URI that fasttask workers will connect to."`
Endpoint string `json:"endpoint" pflag:",Fasttask gRPC service endpoint."`
EnvDetectOrphanInterval config.Duration `json:"env-detect-orphan-interval" pflag:",Frequency that orphaned environments detection is performed."`
EnvGCInterval config.Duration `json:"env-gc-interval" pflag:",Frequency that environments are GCed in case of TTL expirations."`
EnvRepairInterval config.Duration `json:"env-repair-interval" pflag:",Frequency that environments are repaired in case of external modifications (ex. pod deletion)."`
GracePeriodStatusNotFound config.Duration `json:"grace-period-status-not-found" pflag:",The grace period for a task status to be reported before the task is considered failed."`
GracePeriodWorkersUnavailable config.Duration `json:"grace-period-workers-unavailable" pflag:",The grace period for a worker to become available before the task is considered failed."`
HeartbeatBufferSize int `json:"heartbeat-buffer-size" pflag:",The size of the heartbeat buffer for each worker."`
Image string `json:"image" pflag:",Fasttask image to wrap the task execution with."`
InitContainerCPU resource.Quantity `json:"init-container-cpu" pflag:",The default cpu request / limit for the init container used to inject the fasttask worker binary."`
InitContainerMemory resource.Quantity `json:"init-container-memory" pflag:",The default memory request / limit for the init container used to inject the fasttask worker binary."`
NonceLength int `json:"nonce-length" pflag:",The length of the nonce value to uniquely link a fasttask replica to the environment instance, ensuring fast turnover of environments regardless of cache freshness."`
TaskStatusBufferSize int `json:"task-status-buffer-size" pflag:",The size of the task status buffer for each task."`
AdditionalWorkerArgs []string `json:"additional-worker-args" pflag:",Additional arguments to pass to the fasttask worker binary."`
CallbackURI string `json:"callback-uri" pflag:",Fasttask gRPC service URI that fasttask workers will connect to."`
Endpoint string `json:"endpoint" pflag:",Fasttask gRPC service endpoint."`
EnvDetectOrphanInterval config.Duration `json:"env-detect-orphan-interval" pflag:",Frequency that orphaned environments detection is performed."`
EnvGCInterval config.Duration `json:"env-gc-interval" pflag:",Frequency that environments are GCed in case of TTL expirations."`
EnvRepairInterval config.Duration `json:"env-repair-interval" pflag:",Frequency that environments are repaired in case of external modifications (ex. pod deletion)."`
GracePeriodStatusNotFound config.Duration `json:"grace-period-status-not-found" pflag:",The grace period for a task status to be reported before the task is considered failed."`
GracePeriodWorkersUnavailable config.Duration `json:"grace-period-workers-unavailable" pflag:",The grace period for a worker to become available before the task is considered failed."`
HeartbeatBufferSize int `json:"heartbeat-buffer-size" pflag:",The size of the heartbeat buffer for each worker."`
NonceLength int `json:"nonce-length" pflag:",The length of the nonce value to uniquely link a fasttask replica to the environment instance, ensuring fast turnover of environments regardless of cache freshness."`
TaskStatusBufferSize int `json:"task-status-buffer-size" pflag:",The size of the task status buffer for each task."`
AdditionalWorkerArgs []string `json:"additional-worker-args" pflag:",Additional arguments to pass to the fasttask worker binary."`
}

func GetConfig() *Config {
Expand Down
3 changes: 0 additions & 3 deletions fasttask/plugin/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 0 additions & 42 deletions fasttask/plugin/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions fasttask/plugin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ toolchain go1.21.4
require (
github.com/flyteorg/flyte/flyteidl v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flyteplugins v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000
github.com/flyteorg/flyte/flytestdlib v0.0.0-00010101000000-000000000000
github.com/golang/protobuf v1.5.3
github.com/mitchellh/mapstructure v1.5.0
Expand Down Expand Up @@ -44,14 +43,14 @@ require (
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/flyteorg/flyte/flytepropeller v0.0.0-00010101000000-000000000000 // indirect
github.com/flyteorg/stow v0.3.10 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-test/deep v1.0.7 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand Down Expand Up @@ -117,7 +116,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.28.4 // indirect
k8s.io/client-go v0.28.4 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
Expand Down
5 changes: 5 additions & 0 deletions fasttask/worker/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
target
build
dist

Dockerfile*
166 changes: 166 additions & 0 deletions fasttask/worker/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
.ruff_cache

# C extensions
*.so

# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST

# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec

# Installer logs
pip-log.txt
pip-delete-this-directory.txt

# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
.pybuilder/
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version

# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock

# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock

# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml

# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# Spyder project settings
.spyderproject
.spyproject

# Rope project settings
.ropeproject

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# pytype static type analyzer
.pytype/

# Cython debug symbols
cython_debug/

# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/

# Version file is auto-generated by setuptools_scm
.DS_Store

target
Loading

0 comments on commit 4fe2fde

Please sign in to comment.