Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
[wip] Refactors Flyteadmin to make every command, externally invokable (
Browse files Browse the repository at this point in the history
#370)

* SQLite support

Signed-off-by: Ketan Umare <[email protected]>

* Move migrate commands

Signed-off-by: Ketan Umare <[email protected]>

* Clusterresource controller refactor

Signed-off-by: Ketan Umare <[email protected]>

* lint fix

Signed-off-by: Ketan Umare <[email protected]>

* test fix

Signed-off-by: Ketan Umare <[email protected]>

* updated

Signed-off-by: Ketan Umare <[email protected]>

* Updated to handle additional servers

Signed-off-by: Ketan Umare <[email protected]>

* lint fix

Signed-off-by: Ketan Umare <[email protected]>
  • Loading branch information
kumare3 authored Mar 24, 2022
1 parent 4648a2e commit 76af9b2
Show file tree
Hide file tree
Showing 14 changed files with 648 additions and 589 deletions.
1 change: 1 addition & 0 deletions flyteadmin/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ node_modules/

.virtualgo
boilerplate/lyft/end2end/tmp
dist
74 changes: 15 additions & 59 deletions flyteadmin/cmd/entrypoints/clusterresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,12 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/repositories/errors"
errors2 "github.com/pkg/errors"

"github.com/flyteorg/flyteadmin/pkg/clusterresource/impl"
"github.com/flyteorg/flyteadmin/pkg/clusterresource/interfaces"
execClusterIfaces "github.com/flyteorg/flyteadmin/pkg/executioncluster/interfaces"
"github.com/flyteorg/flyteadmin/pkg/manager/impl/resources"
"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/flyteorg/flyteidl/clients/go/admin"

"github.com/flyteorg/flyteadmin/pkg/clusterresource"
"github.com/flyteorg/flyteadmin/pkg/config"
executioncluster "github.com/flyteorg/flyteadmin/pkg/executioncluster/impl"
"github.com/flyteorg/flyteadmin/pkg/runtime"
runtimeInterfaces "github.com/flyteorg/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flytestdlib/logger"

"github.com/spf13/cobra"
Expand All @@ -30,74 +20,40 @@ var parentClusterResourceCmd = &cobra.Command{
Short: "This command administers the ClusterResourceController. Please choose a subcommand.",
}

func getClusterResourceController(ctx context.Context, scope promutils.Scope, configuration runtimeInterfaces.Configuration) clusterresource.Controller {
initializationErrorCounter := scope.MustNewCounter(
"flyteclient_initialization_error",
"count of errors encountered initializing a flyte client from kube config")
var listTargetsProvider execClusterIfaces.ListTargetsInterface
var err error
if len(configuration.ClusterConfiguration().GetClusterConfigs()) == 0 {
serverConfig := config.GetConfig()
listTargetsProvider, err = executioncluster.NewInCluster(initializationErrorCounter, serverConfig.KubeConfig, serverConfig.Master)
} else {
listTargetsProvider, err = executioncluster.NewListTargets(initializationErrorCounter, executioncluster.NewExecutionTargetProvider(), configuration.ClusterConfiguration())
}
if err != nil {
panic(err)
}

var adminDataProvider interfaces.FlyteAdminDataProvider
if configuration.ClusterResourceConfiguration().IsStandaloneDeployment() {
clientSet, err := admin.ClientSetBuilder().WithConfig(admin.GetConfig(ctx)).Build(ctx)
if err != nil {
panic(err)
}
adminDataProvider = impl.NewAdminServiceDataProvider(clientSet.AdminClient())
} else {
dbConfig := runtime.NewConfigurationProvider().ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, dbConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
dbScope := scope.NewSubScope("db")

repo := repositories.NewGormRepo(
db, errors.NewPostgresErrorTransformer(dbScope.NewSubScope("errors")), dbScope)

adminDataProvider = impl.NewDatabaseAdminDataProvider(repo, configuration, resources.NewResourceManager(repo, configuration.ApplicationConfiguration()))
}

return clusterresource.NewClusterResourceController(adminDataProvider, listTargetsProvider, scope)
}

var controllerRunCmd = &cobra.Command{
Use: "run",
Short: "This command will start a cluster resource controller to periodically sync cluster resources",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope).NewSubScope("clusterresource")
clusterResourceController := getClusterResourceController(ctx, scope, configuration)
clusterResourceController, err := clusterresource.NewClusterResourceControllerFromConfig(ctx, scope, configuration)
if err != nil {
return err
}
clusterResourceController.Run()
logger.Infof(ctx, "ClusterResourceController started running successfully")
return nil
},
}

var controllerSyncCmd = &cobra.Command{
Use: "sync",
Short: "This command will sync cluster resources",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
scope := promutils.NewScope(configuration.ApplicationConfiguration().GetTopLevelConfig().MetricsScope).NewSubScope("clusterresource")
clusterResourceController := getClusterResourceController(ctx, scope, configuration)
err := clusterResourceController.Sync(ctx)
clusterResourceController, err := clusterresource.NewClusterResourceControllerFromConfig(ctx, scope, configuration)
if err != nil {
return err
}
err = clusterResourceController.Sync(ctx)
if err != nil {
logger.Fatalf(ctx, "Failed to sync cluster resources [%+v]", err)
return errors2.Wrap(err, "Failed to sync cluster resources ")
}
logger.Infof(ctx, "ClusterResourceController synced successfully")
return nil
},
}

Expand Down
98 changes: 7 additions & 91 deletions flyteadmin/cmd/entrypoints/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@ package entrypoints
import (
"context"

"github.com/flyteorg/flyteadmin/pkg/repositories"
"github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flyteadmin/pkg/runtime"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flyteadmin/pkg/server"

"github.com/go-gormigrate/gormigrate/v2"
"github.com/spf13/cobra"
_ "gorm.io/driver/postgres" // Required to import database driver.
)
Expand All @@ -22,109 +18,29 @@ var parentMigrateCmd = &cobra.Command{
var migrateCmd = &cobra.Command{
Use: "run",
Short: "This command will run all the migrations for the database",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
sqlDB, err := db.DB()
if err != nil {
logger.Fatal(ctx, err)
}

defer func(deferCtx context.Context) {
if err = sqlDB.Close(); err != nil {
logger.Fatal(deferCtx, err)
}
}(ctx)

if err = sqlDB.Ping(); err != nil {
logger.Fatal(ctx, err)
}
m := gormigrate.New(db, gormigrate.DefaultOptions, config.Migrations)
if err = m.Migrate(); err != nil {
logger.Fatalf(ctx, "Could not migrate: %v", err)
}
logger.Infof(ctx, "Migration ran successfully")
return server.Migrate(ctx)
},
}

// Rollback the latest migration
var rollbackCmd = &cobra.Command{
Use: "rollback",
Short: "This command will rollback one migration",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}
sqlDB, err := db.DB()
if err != nil {
logger.Fatal(ctx, err)
}
defer func(deferCtx context.Context) {
if err = sqlDB.Close(); err != nil {
logger.Fatal(deferCtx, err)
}
}(ctx)

if err = sqlDB.Ping(); err != nil {
logger.Fatal(ctx, err)
}

m := gormigrate.New(db, gormigrate.DefaultOptions, config.Migrations)
err = m.RollbackLast()
if err != nil {
logger.Fatalf(ctx, "Could not rollback latest migration: %v", err)
}
logger.Infof(ctx, "Rolled back one migration successfully")
return server.Rollback(ctx)
},
}

// This seeds the database with project values
var seedProjectsCmd = &cobra.Command{
Use: "seed-projects",
Short: "Seed projects in the database.",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
databaseConfig := configuration.ApplicationConfiguration().GetDbConfig()
logConfig := logger.GetConfig()

db, err := repositories.GetDB(ctx, databaseConfig, logConfig)
if err != nil {
logger.Fatal(ctx, err)
}

sqlDB, err := db.DB()
if err != nil {
logger.Fatal(ctx, err)
}

defer func(deferCtx context.Context) {
if err = sqlDB.Close(); err != nil {
logger.Fatal(deferCtx, err)
}
}(ctx)

if err = sqlDB.Ping(); err != nil {
logger.Fatal(ctx, err)
}

if err = config.SeedProjects(db, args); err != nil {
logger.Fatalf(ctx, "Could not add projects to database with err: %v", err)
}
logger.Infof(ctx, "Successfully added projects to database")
return server.SeedProjects(ctx, args)
},
}

Expand Down
Loading

0 comments on commit 76af9b2

Please sign in to comment.