Skip to content

Commit

Permalink
Removed retries during precheck and falling back on k8s retries (flyt…
Browse files Browse the repository at this point in the history
…eorg#306)

* Removed retries during precheck and falling back on k8s retries

Signed-off-by: Prafulla Mahindrakar <[email protected]>

* Linter fixes

Signed-off-by: Prafulla Mahindrakar <[email protected]>
  • Loading branch information
pmahindrakar-oss authored Dec 10, 2021
1 parent 69a0edf commit 4df0c48
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 45 deletions.
45 changes: 13 additions & 32 deletions flyteadmin/cmd/scheduler/entrypoints/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"context"
"fmt"

"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flytestdlib/logger"

"github.com/spf13/cobra"
"google.golang.org/grpc/health/grpc_health_v1"
"k8s.io/client-go/util/retry"
)

const (
healthCheckSuccess = "Health check passed, Flyteadmin is up and running"
healthCheckError = "Health check failed with status %v"
healthCheckError = "health check failed with status %v"
)

var preCheckRunCmd = &cobra.Command{
Expand All @@ -24,40 +22,23 @@ var preCheckRunCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()

appConfig := runtime.NewApplicationConfigurationProvider()
opts := appConfig.GetSchedulerConfig().GetPrecheckBackoff()

err := retry.OnError(opts,
func(err error) bool {
logger.Errorf(ctx, "Attempt failed due to %v", err)
return err != nil
},
func() error {
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)

if err != nil {
logger.Errorf(ctx, "Flyte native scheduler precheck failed due to %v\n", err)
return err
}

healthCheckResponse, err := clientSet.HealthServiceClient().Check(ctx,
&grpc_health_v1.HealthCheckRequest{Service: "flyteadmin"})
if err != nil {
return err
}
if healthCheckResponse.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING {
logger.Errorf(ctx, healthCheckError, healthCheckResponse.GetStatus())
return fmt.Errorf(healthCheckError, healthCheckResponse.GetStatus())
}
logger.Infof(ctx, "Health check response is %v", healthCheckResponse)
return nil
},
)
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)

if err != nil {
logger.Errorf(ctx, "Flyte native scheduler precheck failed due to %v\n", err)
return err
}

healthCheckResponse, err := clientSet.HealthServiceClient().Check(ctx,
&grpc_health_v1.HealthCheckRequest{Service: "flyteadmin"})
if err != nil {
return err
}
if healthCheckResponse.GetStatus() != grpc_health_v1.HealthCheckResponse_SERVING {
logger.Errorf(ctx, healthCheckError, healthCheckResponse.GetStatus())
return fmt.Errorf(healthCheckError, healthCheckResponse.GetStatus())
}
logger.Infof(ctx, "Health check response is %v", healthCheckResponse)
logger.Infof(ctx, healthCheckSuccess)
return nil
},
Expand Down
6 changes: 0 additions & 6 deletions flyteadmin/pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@ import (
"io/ioutil"
"os"
"strings"
"time"

"github.com/flyteorg/flyteadmin/pkg/common"
"github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/config"
"github.com/flyteorg/flytestdlib/logger"

"k8s.io/apimachinery/pkg/util/wait"
)

const database = "database"
Expand Down Expand Up @@ -59,9 +56,6 @@ var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.Schedule
},
},
},
PrecheckBackoff: wait.Backoff{
Duration: time.Second, Factor: 2.0, Steps: 30, Jitter: 0.1,
},
})
var remoteDataConfig = config.MustRegisterSection(remoteData, &interfaces.RemoteDataConfig{
Scheme: common.None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package interfaces
import (
"github.com/flyteorg/flytestdlib/config"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/util/wait"
)

// This configuration section is used to for initiating the database connection with the store that holds registered
Expand Down Expand Up @@ -276,8 +275,6 @@ type SchedulerConfig struct {
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the workflow executor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
// Specifies the backoff settings when scheduler checks for the flyteadmin health during startup.
PrecheckBackoff wait.Backoff `json:"backoff"`
}

func (s *SchedulerConfig) GetEventSchedulerConfig() EventSchedulerConfig {
Expand All @@ -296,10 +293,6 @@ func (s *SchedulerConfig) GetReconnectDelaySeconds() int {
return s.ReconnectDelaySeconds
}

func (s *SchedulerConfig) GetPrecheckBackoff() wait.Backoff {
return s.PrecheckBackoff
}

// Configuration specific to setting up signed urls.
type SignedURL struct {
// Whether signed urls should even be returned with GetExecutionData, GetNodeExecutionData and GetTaskExecutionData
Expand Down

0 comments on commit 4df0c48

Please sign in to comment.