Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

feat: add support for inter-cluster communication for Ray plugin #321

Merged
merged 7 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions go/tasks/pluginmachinery/k8s/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package k8s

import (
"fmt"
"io/ioutil"

"github.com/pkg/errors"
restclient "k8s.io/client-go/rest"
)

type ClusterConfig struct {
Name string `json:"name" pflag:",Friendly name of the remote cluster"`
Endpoint string `json:"endpoint" pflag:", Remote K8s cluster endpoint"`
Auth Auth `json:"auth" pflag:"-, Auth setting for the cluster"`
Enabled bool `json:"enabled" pflag:", Boolean flag to enable or disable"`
}

type Auth struct {
TokenPath string `json:"tokenPath" pflag:", Token path"`
CaCertPath string `json:"caCertPath" pflag:", Certificate path"`
}

func (auth Auth) GetCA() ([]byte, error) {
cert, err := ioutil.ReadFile(auth.CaCertPath)
if err != nil {
return nil, errors.Wrap(err, "failed to read k8s CA cert from configured path")
}
return cert, nil
}

func (auth Auth) GetToken() (string, error) {
token, err := ioutil.ReadFile(auth.TokenPath)
if err != nil {
return "", errors.Wrap(err, "failed to read k8s bearer token from configured path")
}
return string(token), nil
}

// KubeClientConfig ...
func KubeClientConfig(host string, auth Auth) (*restclient.Config, error) {
tokenString, err := auth.GetToken()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth token: %+v", err))
}

caCert, err := auth.GetCA()
if err != nil {
return nil, errors.New(fmt.Sprintf("Failed to get auth CA: %+v", err))
}

tlsClientConfig := restclient.TLSClientConfig{}
tlsClientConfig.CAData = caCert
return &restclient.Config{
Host: host,
TLSClientConfig: tlsClientConfig,
BearerToken: tokenString,
}, nil
}
4 changes: 4 additions & 0 deletions go/tasks/plugins/k8s/ray/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ray

import (
pluginsConfig "github.com/flyteorg/flyteplugins/go/tasks/config"
pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
)

//go:generate pflags Config --default-var=defaultConfig
Expand Down Expand Up @@ -40,6 +41,9 @@ type Config struct {

// NodeIPAddress the IP address of the head node. By default, this is pod ip address.
NodeIPAddress string `json:"nodeIPAddress,omitempty"`

// Remote Ray Cluster Config
RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"`
}

func GetConfig() *Config {
Expand Down
26 changes: 26 additions & 0 deletions go/tasks/plugins/k8s/ray/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ray

import (
"testing"

pluginmachinery "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s"
"gotest.tools/assert"
)

func TestLoadConfig(t *testing.T) {
rayConfig := GetConfig()
assert.Assert(t, rayConfig != nil)

t.Run("remote cluster", func(t *testing.T) {
config := GetConfig()
remoteConfig := pluginmachinery.ClusterConfig{
Enabled: false,
Endpoint: "",
Auth: pluginmachinery.Auth{
TokenPath: "",
CaCertPath: "",
},
}
assert.DeepEqual(t, config.RemoteClusterConfig, remoteConfig)
})
}
14 changes: 14 additions & 0 deletions go/tasks/plugins/k8s/ray/ray.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,5 +371,19 @@ func init() {
ResourceToWatch: &rayv1alpha1.RayJob{},
Plugin: rayJobResourceHandler{},
IsDefault: false,
CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) {
remoteConfig := GetConfig().RemoteClusterConfig
if !remoteConfig.Enabled {
// use controller-runtime KubeClient
return nil, nil
}

kubeConfig, err := k8s.KubeClientConfig(remoteConfig.Endpoint, remoteConfig.Auth)
if err != nil {
return nil, err
}

return k8s.NewDefaultKubeClient(kubeConfig)
},
})
}
7 changes: 7 additions & 0 deletions go/tasks/plugins/k8s/ray/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
plugins:
ray:
remoteClusterConfig:
endpoint: 127.0.0.1
auth:
tokenPath: /path/token
caCertPath: /path/cert