Skip to content

Commit

Permalink
Bug Fix and Perf Tweak #minor (flyteorg#248)
Browse files Browse the repository at this point in the history
* Bug Fix and Perf Tweak

 - Building a cache from Flyteadmin executions, should stop querying
flyteadmin, if the workflow is in terminal state
 - Default for Max streak length can be optimized to 8 (number of rounds
needed to complete one workflow start to end, with inmemory execution)
 - Other defaults set that have been hardened over time.

Signed-off-by: Ketan Umare <[email protected]>

* Updated testcase

Signed-off-by: Ketan Umare <[email protected]>

* lint fix

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Mar 31, 2021
1 parent ac66989 commit 0edfd3a
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 23 deletions.
2 changes: 1 addition & 1 deletion flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.10.0
github.com/flyteorg/flyteidl v0.18.25
github.com/flyteorg/flyteplugins v0.5.41
github.com/flyteorg/flyteplugins v0.5.42
github.com/flyteorg/flytestdlib v0.3.13
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ github.com/flyteorg/flyteidl v0.18.25 h1:XbHwM4G1u5nGAcdKod+ENgbL84cHdNzQIWY+Naj
github.com/flyteorg/flyteidl v0.18.25/go.mod h1:b5Fq4Z8a5b0mF6pEwTd48ufvikUGVkWSjZiMT0ZtqKI=
github.com/flyteorg/flyteplugins v0.5.41 h1:8n1Z55P59ICV4453Dk7fhaUbB944j3BMZ+ozywHczgU=
github.com/flyteorg/flyteplugins v0.5.41/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flyteplugins v0.5.42 h1:G4DRR2r8LlmkV+orXloDi1ly+M5WuvAaNlWFgGGyy3A=
github.com/flyteorg/flyteplugins v0.5.42/go.mod h1:ireF+bYk8xjw9BfcMbPN/hN5aZeBJpP0CoQYHkSRL+w=
github.com/flyteorg/flytestdlib v0.3.13 h1:5ioA/q3ixlyqkFh5kDaHgmPyTP/AHtqq1K/TIbVLUzM=
github.com/flyteorg/flytestdlib v0.3.13/go.mod h1:Tz8JCECAbX6VWGwFT6cmEQ+RJpZ/6L9pswu3fzWs220=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
Expand Down
91 changes: 73 additions & 18 deletions flytepropeller/pkg/controller/config/config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,33 @@
// Package config contains the core configuration for FlytePropeller. This configuration can be added under the ``propeller`` section.
// Example config:
// ----------------
// propeller:
// rawoutput-prefix: s3://my-container/test/
// metadata-prefix: metadata/propeller/sandbox
// workers: 4
// workflow-reeval-duration: 10s
// downstream-eval-duration: 5s
// limit-namespace: "all"
// prof-port: 11254
// metrics-prefix: flyte
// enable-admin-launcher: true
// max-ttl-hours: 1
// gc-interval: 500m
// queue:
// type: batch
// queue:
// type: bucket
// rate: 20
// capacity: 100
// sub-queue:
// type: bucket
// rate: 100
// capacity: 1000
// # This config assumes using `make start` in flytesnacks repo to startup a DinD k3s container
// kube-config: "$HOME/kubeconfig/k3s/k3s.yaml"
// publish-k8s-events: true
// workflowStore:
// policy: "ResourceVersionCache"
package config

import (
Expand All @@ -15,22 +45,38 @@ var (
configSection = config.MustRegisterSection(configSectionKey, defaultConfig)

defaultConfig = &Config{
MaxWorkflowRetries: 5,
Workers: 20,
WorkflowReEval: config.Duration{
Duration: 10 * time.Second,
},
DownstreamEval: config.Duration{
Duration: 30 * time.Second,
},
MaxWorkflowRetries: 10,
MaxDatasetSizeBytes: 10 * 1024 * 1024,
Queue: CompositeQueueConfig{
Type: CompositeQueueSimple,
Type: CompositeQueueBatch,
BatchingInterval: config.Duration{
Duration: time.Second,
},
BatchSize: -1,
Queue: WorkqueueConfig{
Type: WorkqueueTypeDefault,
BaseDelay: config.Duration{Duration: time.Second * 10},
MaxDelay: config.Duration{Duration: time.Second * 10},
Rate: 10,
Capacity: 100,
Type: WorkqueueTypeMaxOfRateLimiter,
BaseDelay: config.Duration{Duration: time.Second * 5},
MaxDelay: config.Duration{Duration: time.Second * 60},
Rate: 100,
Capacity: 1000,
},
Sub: WorkqueueConfig{
Type: WorkqueueTypeBucketRateLimiter,
Rate: 100,
Capacity: 1000,
},
},
KubeConfig: KubeClientConfig{
QPS: 5,
Burst: 10,
Timeout: config.Duration{Duration: 0},
QPS: 100,
Burst: 25,
Timeout: config.Duration{Duration: 30 * time.Second},
},
LeaderElection: LeaderElectionConfig{
Enabled: false,
Expand All @@ -47,13 +93,20 @@ var (
MaxNodeRetriesOnSystemFailures: 3,
InterruptibleFailureThreshold: 1,
},
MaxStreakLength: 5, // Turbo mode is enabled by default
MaxStreakLength: 8, // Turbo mode is enabled by default
ProfilerPort: config.Port{
Port: 10254,
},
LimitNamespace: "all",
MetadataPrefix: "metadata/propeller",
EnableAdminLauncher: true,
MetricsPrefix: "flyte",
}
)

// NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables.
// Config that uses the flytestdlib Config module to generate commandline and load config files. This configuration is
// the base configuration to start propeller
// NOTE: when adding new fields, do not mark them as "omitempty" if it's desirable to read the value from env variables.
type Config struct {
KubeConfigPath string `json:"kube-config" pflag:",Path to kubernetes client config file."`
MasterURL string `json:"master"`
Expand All @@ -78,6 +131,7 @@ type Config struct {
MaxStreakLength int `json:"max-streak-length" pflag:",Maximum number of consecutive rounds that one propeller worker can use for one workflow - >1 => turbo-mode is enabled."`
}

// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
type KubeClientConfig struct {
// QPS indicates the maximum QPS to the master from this client.
// If it's zero, the created RESTClient will use DefaultQPS: 5
Expand All @@ -96,6 +150,7 @@ const (
CompositeQueueBatch CompositeQueueType = "batch"
)

// CompositeQueueConfig contains configuration for the controller queue and the downstream resource queue
type CompositeQueueConfig struct {
Type CompositeQueueType `json:"type" pflag:",Type of composite queue to use for the WorkQueue"`
Queue WorkqueueConfig `json:"queue,omitempty" pflag:",Workflow workqueue configuration, affects the way the work is consumed from the queue."`
Expand All @@ -113,7 +168,7 @@ const (
WorkqueueTypeMaxOfRateLimiter WorkqueueType = "maxof"
)

// prototypical configuration to configure a workqueue. We may want to generalize this in a package like k8sutils
// WorkqueueConfig has the configuration to configure a workqueue. We may want to generalize this in a package like k8sutils
type WorkqueueConfig struct {
// Refer to https://github.com/kubernetes/client-go/tree/master/util/workqueue
Type WorkqueueType `json:"type" pflag:",Type of RateLimiter to use for the WorkQueue"`
Expand All @@ -123,21 +178,21 @@ type WorkqueueConfig struct {
Capacity int `json:"capacity" pflag:",Bucket capacity as number of items"`
}

// configuration for a node
// NodeConfig contains configuration that is useful for every node execution
type NodeConfig struct {
DefaultDeadlines DefaultDeadlines `json:"default-deadlines,omitempty" pflag:",Default value for timeouts"`
MaxNodeRetriesOnSystemFailures int64 `json:"max-node-retries-system-failures" pflag:"2,Maximum number of retries per node for node failure due to infra issues"`
InterruptibleFailureThreshold int64 `json:"interruptible-failure-threshold" pflag:"1,number of failures for a node to be still considered interruptible'"`
}

// Contains default values for timeouts
// DefaultDeadlines contains default values for timeouts
type DefaultDeadlines struct {
DefaultNodeExecutionDeadline config.Duration `json:"node-execution-deadline" pflag:",Default value of node execution timeout"`
DefaultNodeActiveDeadline config.Duration `json:"node-active-deadline" pflag:",Default value of node timeout"`
DefaultWorkflowActiveDeadline config.Duration `json:"workflow-active-deadline" pflag:",Default value of workflow timeout"`
}

// Contains leader election configuration.
// LeaderElectionConfig Contains leader election configuration.
type LeaderElectionConfig struct {
// Enable or disable leader election.
Enabled bool `json:"enabled" pflag:",Enables/Disables leader election."`
Expand All @@ -156,12 +211,12 @@ type LeaderElectionConfig struct {
RetryPeriod config.Duration `json:"retry-period" pflag:",Duration the LeaderElector clients should wait between tries of actions."`
}

// Extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
// TODO What if the type is incorrect?
// GetConfig extracts the Configuration from the global config module in flytestdlib and returns the corresponding type-casted object.
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

// MustRegisterSubSection can be used to configure any subsections the the propeller configuration
func MustRegisterSubSection(subSectionKey string, section config.Config) config.Section {
return configSection.MustRegisterSection(subSectionKey, section)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ import (
"google.golang.org/grpc/status"
)

// IsWorkflowTerminated returns a true if the Workflow Phase is in a Terminal Phase, else returns a false
func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool {
return p == core.WorkflowExecution_ABORTED || p == core.WorkflowExecution_FAILED ||
p == core.WorkflowExecution_SUCCEEDED || p == core.WorkflowExecution_TIMED_OUT
}

// Executor for Launchplans that executes on a remote FlyteAdmin service (if configured)
type adminLaunchPlanExecutor struct {
adminClient service.AdminServiceClient
Expand Down Expand Up @@ -141,6 +147,21 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
resp = make([]cache.ItemSyncResponse, 0, len(batch))
for _, obj := range batch {
exec := obj.GetItem().(executionCacheItem)

// Is workflow already terminated, then no need to fetch information, also the item can be dropped from the cache
if exec.ExecutionClosure != nil {
if IsWorkflowTerminated(exec.ExecutionClosure.Phase) {
logger.Debugf(ctx, "Workflow [%s] is already completed, will not fetch execution information", exec.ExecutionClosure.WorkflowId)
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: exec,
Action: cache.Unchanged,
})
continue
}
}

// Workflow is not already terminated, lets check the status
req := &admin.WorkflowExecutionGetRequest{
Id: &exec.WorkflowExecutionIdentifier,
}
Expand All @@ -167,6 +188,7 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
continue
}

// Update the cache with the retrieved status
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"testing"
"time"

"github.com/flyteorg/flytestdlib/cache"
mocks2 "github.com/flyteorg/flytestdlib/cache/mocks"

"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flyteidl/clients/go/admin/mocks"
Expand Down Expand Up @@ -39,6 +42,25 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
assert.Equal(t, result, s)
})

t.Run("terminal-sync", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}
exec, err := NewAdminLaunchPlanExecutor(ctx, mockClient, time.Millisecond, defaultAdminConfig, promutils.NewTestScope())
assert.NoError(t, err)
iwMock := &mocks2.ItemWrapper{}
i := executionCacheItem{ExecutionClosure: &admin.ExecutionClosure{Phase: core.WorkflowExecution_SUCCEEDED, WorkflowId: &core.Identifier{Project: "p"}}}
iwMock.OnGetItem().Return(i)
iwMock.OnGetID().Return("id")
adminExec := exec.(*adminLaunchPlanExecutor)
v, err := adminExec.syncItem(ctx, cache.Batch{
iwMock,
})
assert.NoError(t, err)
assert.NotNil(t, v)
assert.Len(t, v, 1)
assert.Equal(t, v[0].ID, "id")
assert.Equal(t, v[0].Item, i)
})

t.Run("notFound", func(t *testing.T) {
mockClient := &mocks.AdminServiceClient{}

Expand Down Expand Up @@ -130,6 +152,7 @@ func TestAdminLaunchPlanExecutor_GetStatus(t *testing.T) {
assert.Nil(t, s)
assert.False(t, IsNotFound(err))
})

}

func TestAdminLaunchPlanExecutor_Launch(t *testing.T) {
Expand Down Expand Up @@ -313,3 +336,15 @@ func TestNewAdminLaunchPlanExecutor_GetLaunchPlan(t *testing.T) {
assert.Error(t, err)
})
}

func TestIsWorkflowTerminated(t *testing.T) {
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDED))
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_ABORTED))
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_FAILED))
assert.True(t, IsWorkflowTerminated(core.WorkflowExecution_TIMED_OUT))
assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_SUCCEEDING))
assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_FAILING))
assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_RUNNING))
assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_QUEUED))
assert.False(t, IsWorkflowTerminated(core.WorkflowExecution_UNDEFINED))
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

var (
defaultAdminConfig = &AdminConfig{
TPS: 5,
TPS: 100,
Burst: 10,
MaxCacheSize: 10000,
Workers: 10,
Expand All @@ -17,6 +17,8 @@ var (
adminConfigSection = ctrlConfig.MustRegisterSubSection("admin-launcher", defaultAdminConfig)
)

// AdminConfig provides a "admin-launcher" section in core Flytepropeller configuration and can be used to configure
// the rate at which Flytepropeller can query for status of workflows in flyteadmin or create new executions
type AdminConfig struct {
// TPS indicates the maximum transactions per second to flyte admin from this client.
// If it's zero, the created client will use DefaultTPS: 5
Expand Down
13 changes: 10 additions & 3 deletions flytepropeller/pkg/controller/workflowstore/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@ import (
type Policy = string

const (
PolicyInMemory = "InMemory"
PolicyPassThrough = "PassThrough"
// PolicyInMemory provides an inmemory Workflow store which is useful for testing
PolicyInMemory = "InMemory"
// PolicyPassThrough just calls the underlying Clientset or the shared informer cache to get or write the workflow
PolicyPassThrough = "PassThrough"
// PolicyResourceVersionCache uses the resource version on the Workflow object, to determine if the inmemory copy
// of the workflow is stale
PolicyResourceVersionCache = "ResourceVersionCache"
)

// By default we will use the ResourceVersionCache example
var (
defaultConfig = &Config{
Policy: PolicyPassThrough,
Policy: PolicyResourceVersionCache,
}

configSection = ctrlConfig.MustRegisterSubSection("workflowStore", defaultConfig)
)

// Config for Workflow access in the controller.
// Various policies are available like - InMemory, PassThrough, ResourceVersionCache
type Config struct {
Policy Policy `json:"policy" pflag:",Workflow Store Policy to initialize"`
}
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/pkg/controller/workflowstore/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
PriorityClassRegular
)

// FlyteWorkflow store interface provides an abstraction of accessing the actual FlyteWorkflow object.
type FlyteWorkflow interface {
Get(ctx context.Context, namespace, name string) (*v1alpha1.FlyteWorkflow, error)
UpdateStatus(ctx context.Context, workflow *v1alpha1.FlyteWorkflow, priorityClass PriorityClass) (
Expand Down

0 comments on commit 0edfd3a

Please sign in to comment.