Skip to content

Commit

Permalink
Implementation of Weighted Clusters and domain based routing (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan authored Oct 15, 2019
1 parent 8edb4fc commit 263f775
Show file tree
Hide file tree
Showing 22 changed files with 594 additions and 342 deletions.
191 changes: 95 additions & 96 deletions flyteadmin/Gopkg.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion flyteadmin/Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
[[override]]
name = "github.com/lyft/flytestdlib"
source = "https://github.com/lyft/flytestdlib"
version = "^v0.2.12"
version = "^v0.2.22"

[[constraint]]
name = "github.com/magiconair/properties"
Expand Down
7 changes: 3 additions & 4 deletions flyteadmin/cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ package entrypoints
import (
"context"

"github.com/lyft/flyteadmin/pkg/executioncluster"

"github.com/lyft/flyteadmin/pkg/clusterresource"
executioncluster "github.com/lyft/flyteadmin/pkg/executioncluster/impl"

"github.com/lyft/flyteadmin/pkg/runtime"

Expand Down Expand Up @@ -57,7 +56,7 @@ var controllerRunCmd = &cobra.Command{
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration.ClusterConfiguration())
configuration)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
clusterResourceController.Run()
Expand Down Expand Up @@ -89,7 +88,7 @@ var controllerSyncCmd = &cobra.Command{
scope.NewSubScope("cluster"),
cfg.KubeConfig,
cfg.Master,
configuration.ClusterConfiguration())
configuration)

clusterResourceController := clusterresource.NewClusterResourceController(db, executionCluster, scope)
err := clusterResourceController.Sync(ctx)
Expand Down
8 changes: 4 additions & 4 deletions flyteadmin/pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"strings"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"

"github.com/lyft/flyteadmin/pkg/executioncluster"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/lyft/flyteadmin/pkg/common"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
Expand Down Expand Up @@ -68,7 +68,7 @@ type NamespaceCache = map[NamespaceName]LastModTimeCache
type controller struct {
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
executionCluster executioncluster.ClusterInterface
executionCluster interfaces.ClusterInterface
poller chan struct{}
metrics controllerMetrics
lastAppliedTemplateDir string
Expand Down Expand Up @@ -340,7 +340,7 @@ func newMetrics(scope promutils.Scope) controllerMetrics {
}
}

func NewClusterResourceController(db repositories.RepositoryInterface, executionCluster executioncluster.ClusterInterface, scope promutils.Scope) Controller {
func NewClusterResourceController(db repositories.RepositoryInterface, executionCluster interfaces.ClusterInterface, scope promutils.Scope) Controller {
config := runtime.NewConfigurationProvider()
return &controller{
db: db,
Expand Down
43 changes: 7 additions & 36 deletions flyteadmin/pkg/executioncluster/execution_target.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package executioncluster

import (
"github.com/lyft/flyteadmin/pkg/flytek8s"
runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
"github.com/lyft/flytestdlib/promutils"
"k8s.io/client-go/rest"
"github.com/lyft/flytestdlib/random"

"sigs.k8s.io/controller-runtime/pkg/client"
)

// Spec to determine the execution target
type ExecutionTargetSpec struct {
TargetID string
TargetID string
ExecutionID *core.WorkflowExecutionIdentifier
}

// Client object of the target execution cluster
Expand All @@ -22,35 +22,6 @@ type ExecutionTarget struct {
Enabled bool
}

func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) {
fc, err := flyteclient.NewForConfig(kubeConfiguration)
if err != nil {
scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config").Inc()
return nil, err
}
return fc, nil
}

// Creates a new Execution target for a cluster based on config passed in.
func NewExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*ExecutionTarget, error) {
kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster)
if err != nil {
return nil, err
}
flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf)
if err != nil {
return nil, err
}
client, err := client.New(kubeConf, client.Options{})
if err != nil {
return nil, err
}
return &ExecutionTarget{
FlyteClient: flyteClient,
Client: client,
ID: k8sCluster.Name,
Enabled: k8sCluster.Enabled,
}, nil
func (e ExecutionTarget) Compare(to random.Comparable) bool {
return e.ID < to.(ExecutionTarget).ID
}
40 changes: 0 additions & 40 deletions flyteadmin/pkg/executioncluster/factory.go

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package impl

import (
"github.com/lyft/flyteadmin/pkg/executioncluster"
"github.com/lyft/flyteadmin/pkg/flytek8s"
runtime "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
flyteclient "github.com/lyft/flytepropeller/pkg/client/clientset/versioned"
"github.com/lyft/flytestdlib/promutils"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type clusterExecutionTargetProvider struct{}

// Creates a new Execution target for a cluster based on config passed in.
func (c *clusterExecutionTargetProvider) GetExecutionTarget(scope promutils.Scope, k8sCluster runtime.ClusterConfig) (*executioncluster.ExecutionTarget, error) {
kubeConf, err := flytek8s.GetRestClientConfigForCluster(k8sCluster)
if err != nil {
return nil, err
}
flyteClient, err := getRestClientFromKubeConfig(scope, kubeConf)
if err != nil {
return nil, err
}
client, err := client.New(kubeConf, client.Options{})
if err != nil {
return nil, err
}
return &executioncluster.ExecutionTarget{
FlyteClient: flyteClient,
Client: client,
ID: k8sCluster.Name,
Enabled: k8sCluster.Enabled,
}, nil
}

func getRestClientFromKubeConfig(scope promutils.Scope, kubeConfiguration *rest.Config) (*flyteclient.Clientset, error) {
fc, err := flyteclient.NewForConfig(kubeConfiguration)
if err != nil {
scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config").Inc()
return nil, err
}
return fc, nil
}
24 changes: 24 additions & 0 deletions flyteadmin/pkg/executioncluster/impl/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package impl

import (
executioncluster_interface "github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"
"github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flytestdlib/promutils"
)

func GetExecutionCluster(scope promutils.Scope, kubeConfig, master string, config interfaces.Configuration) executioncluster_interface.ClusterInterface {
switch len(config.ClusterConfiguration().GetClusterConfigs()) {
case 0:
cluster, err := NewInCluster(scope, kubeConfig, master)
if err != nil {
panic(err)
}
return cluster
default:
cluster, err := NewRandomClusterSelector(scope, config.ClusterConfiguration(), &clusterExecutionTargetProvider{}, config.ApplicationConfiguration().GetDomainsConfig())
if err != nil {
panic(err)
}
return cluster
}
}
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
package executioncluster
package impl

import (
"fmt"

"github.com/lyft/flyteadmin/pkg/executioncluster"
"github.com/lyft/flyteadmin/pkg/executioncluster/interfaces"
"github.com/lyft/flyteadmin/pkg/flytek8s"
"github.com/lyft/flytestdlib/promutils"
"github.com/pkg/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type InCluster struct {
target ExecutionTarget
target executioncluster.ExecutionTarget
}

func (i InCluster) GetTarget(spec *ExecutionTargetSpec) (*ExecutionTarget, error) {
func (i InCluster) GetTarget(spec *executioncluster.ExecutionTargetSpec) (*executioncluster.ExecutionTarget, error) {
if spec != nil && spec.TargetID != "" {
return nil, errors.New(fmt.Sprintf("remote target %s is not supported", spec.TargetID))
}
return &i.target, nil
}

func (i InCluster) GetAllValidTargets() []ExecutionTarget {
return []ExecutionTarget{
func (i InCluster) GetAllValidTargets() []executioncluster.ExecutionTarget {
return []executioncluster.ExecutionTarget{
i.target,
}
}

func NewInCluster(scope promutils.Scope, kubeConfig, master string) (ClusterInterface, error) {
func NewInCluster(scope promutils.Scope, kubeConfig, master string) (interfaces.ClusterInterface, error) {
clientConfig, err := flytek8s.GetRestClientConfig(kubeConfig, master, nil)
if err != nil {
return nil, err
Expand All @@ -40,7 +42,7 @@ func NewInCluster(scope promutils.Scope, kubeConfig, master string) (ClusterInte
return nil, err
}
return &InCluster{
target: ExecutionTarget{
target: executioncluster.ExecutionTarget{
Client: client,
FlyteClient: flyteClient,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package executioncluster
package impl

import (
"testing"

"github.com/lyft/flyteadmin/pkg/executioncluster"

"github.com/stretchr/testify/assert"
)

func TestInClusterGetTarget(t *testing.T) {
cluster := InCluster{
target: ExecutionTarget{
target: executioncluster.ExecutionTarget{
ID: "t1",
},
}
Expand All @@ -19,15 +21,15 @@ func TestInClusterGetTarget(t *testing.T) {

func TestInClusterGetRemoteTarget(t *testing.T) {
cluster := InCluster{
target: ExecutionTarget{},
target: executioncluster.ExecutionTarget{},
}
_, err := cluster.GetTarget(&ExecutionTargetSpec{TargetID: "t1"})
_, err := cluster.GetTarget(&executioncluster.ExecutionTargetSpec{TargetID: "t1"})
assert.EqualError(t, err, "remote target t1 is not supported")
}

func TestInClusterGetAllValidTargets(t *testing.T) {
cluster := InCluster{
target: ExecutionTarget{
target: executioncluster.ExecutionTarget{
ID: "t1",
},
}
Expand Down
Loading

0 comments on commit 263f775

Please sign in to comment.