Skip to content

Commit

Permalink
Refactors datacatalog to make every command, externally invokable (fl…
Browse files Browse the repository at this point in the history
  • Loading branch information
pingsutw authored Apr 6, 2022
1 parent dbbe689 commit b45ed42
Show file tree
Hide file tree
Showing 20 changed files with 363 additions and 619 deletions.
71 changes: 2 additions & 69 deletions cmd/entrypoints/migrate.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package entrypoints

import (
"reflect"

"github.com/flyteorg/datacatalog/pkg/repositories"
errors2 "github.com/flyteorg/datacatalog/pkg/repositories/errors"
"github.com/flyteorg/datacatalog/pkg/runtime"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"

"github.com/jackc/pgconn"

"context"

Expand All @@ -21,72 +13,13 @@ var parentMigrateCmd = &cobra.Command{
Short: "This command controls migration behavior for the Flyte Catalog database. Please choose a subcommand.",
}

var migrationsScope = promutils.NewScope("migrations")
var migrateScope = migrationsScope.NewSubScope("migrate")

// all postgres servers come by default with a db name named postgres
const defaultDB = "postgres"
const pqInvalidDBCode = "3D000"

// This runs all the migrations
var migrateCmd = &cobra.Command{
Use: "run",
Short: "This command will run all the migrations for the database",
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
configProvider := runtime.NewConfigurationProvider()
dbConfigValues := configProvider.ApplicationConfiguration().GetDbConfig()

dbName := dbConfigValues.DbName
dbHandle, err := repositories.NewDBHandle(dbConfigValues, migrateScope)

if err != nil {
// if db does not exist, try creating it
cErr, ok := err.(errors2.ConnectError)
if !ok {
logger.Errorf(ctx, "Failed to cast error of type: %v, err: %v", reflect.TypeOf(err),
err)
panic(err)
}
pqError := cErr.Unwrap().(*pgconn.PgError)
if pqError.Code == pqInvalidDBCode {
logger.Warningf(ctx, "Database [%v] does not exist, trying to create it now", dbName)

dbConfigValues.DbName = defaultDB
setupDBHandler, err := repositories.NewDBHandle(dbConfigValues, migrateScope)
if err != nil {
logger.Errorf(ctx, "Failed to connect to default DB %v, err %v", defaultDB, err)
panic(err)
}

// Create the database if it doesn't exist
// NOTE: this is non-destructive - if for some reason one does exist an err will be thrown
err = setupDBHandler.CreateDB(dbName)
if err != nil {
logger.Errorf(ctx, "Failed to create DB %v err %v", dbName, err)
panic(err)
}

dbConfigValues.DbName = dbName
dbHandle, err = repositories.NewDBHandle(dbConfigValues, migrateScope)
if err != nil {
logger.Errorf(ctx, "Failed to connect DB err %v", err)
panic(err)
}
} else {
logger.Errorf(ctx, "Failed to connect DB err %v", err)
panic(err)
}
}

logger.Infof(ctx, "Created DB connection.")

// TODO: checkpoints for migrations
if err := dbHandle.Migrate(ctx); err != nil {
logger.Errorf(ctx, "Failed to migrate. err: %v", err)
panic(err)
}
logger.Infof(ctx, "Ran DB migration successfully.")
return repositories.Migrate(ctx)
},
}

Expand Down
57 changes: 8 additions & 49 deletions cmd/entrypoints/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package entrypoints

import (
"context"
"net"
"net/http"

"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

"github.com/flyteorg/datacatalog/pkg/config"
"github.com/flyteorg/datacatalog/pkg/rpc/datacatalogservice"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flytestdlib/logger"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)

var serveCmd = &cobra.Command{
Expand All @@ -25,56 +21,19 @@ var serveCmd = &cobra.Command{

// serve a http healthcheck endpoint
go func() {
err := serveHTTPHealthcheck(ctx, cfg)
err := datacatalogservice.ServeHTTPHealthCheck(ctx, cfg)
if err != nil {
logger.Errorf(ctx, "Unable to serve http", config.GetConfig().GetHTTPHostAddress(), err)
}
}()

return serveInsecure(ctx, cfg)
// Set Keys
labeled.SetMetricKeys(contextutils.AppNameKey, contextutils.ProjectKey, contextutils.DomainKey)

return datacatalogservice.ServeInsecure(ctx, cfg)
},
}

func init() {
RootCmd.AddCommand(serveCmd)
}

// Create and start the gRPC server
func serveInsecure(ctx context.Context, cfg *config.Config) error {
grpcServer := newGRPCServer(ctx, cfg)

grpcListener, err := net.Listen("tcp", cfg.GetGrpcHostAddress())
if err != nil {
return err
}

logger.Infof(ctx, "Serving DataCatalog Insecure on port %v", config.GetConfig().GetGrpcHostAddress())
return grpcServer.Serve(grpcListener)
}

// Creates a new GRPC Server with all the configuration
func newGRPCServer(_ context.Context, cfg *config.Config) *grpc.Server {
grpcServer := grpc.NewServer()
datacatalog.RegisterDataCatalogServer(grpcServer, datacatalogservice.NewDataCatalogService())

healthServer := health.NewServer()
healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)

if cfg.GrpcServerReflection {
reflection.Register(grpcServer)
}
return grpcServer
}

func serveHTTPHealthcheck(ctx context.Context, cfg *config.Config) error {
mux := http.NewServeMux()

// Register Healthcheck
mux.HandleFunc("/healthcheck", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

logger.Infof(ctx, "Serving DataCatalog http on port %v", cfg.GetHTTPHostAddress())
return http.ListenAndServe(cfg.GetHTTPHostAddress(), mux)
}
38 changes: 1 addition & 37 deletions cmd/entrypoints/serve_dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ package entrypoints

import (
"context"
"net"

"github.com/flyteorg/datacatalog/pkg/config"
"github.com/flyteorg/datacatalog/pkg/rpc/datacatalogservice"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog"
"github.com/flyteorg/flytestdlib/logger"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)

var serveDummyCmd = &cobra.Command{
Expand All @@ -19,41 +14,10 @@ var serveDummyCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background()
cfg := config.GetConfig()
return serveDummy(ctx, cfg)
return datacatalogservice.Serve(ctx, cfg)
},
}

func init() {
RootCmd.AddCommand(serveDummyCmd)
}

// Create and start the gRPC server and http healthcheck endpoint
func serveDummy(ctx context.Context, cfg *config.Config) error {
// serve a http healthcheck endpoint
go func() {
err := serveHTTPHealthcheck(ctx, cfg)
if err != nil {
logger.Errorf(ctx, "Unable to serve http", cfg.GetGrpcHostAddress(), err)
}
}()

grpcServer := newGRPCDummyServer(ctx, cfg)

grpcListener, err := net.Listen("tcp", cfg.GetGrpcHostAddress())
if err != nil {
return err
}

logger.Infof(ctx, "Serving DataCatalog Insecure on port %v", cfg.GetGrpcHostAddress())
return grpcServer.Serve(grpcListener)
}

// Creates a new GRPC Server with all the configuration
func newGRPCDummyServer(_ context.Context, cfg *config.Config) *grpc.Server {
grpcServer := grpc.NewServer()
datacatalog.RegisterDataCatalogServer(grpcServer, &datacatalogservice.DataCatalogService{})
if cfg.GrpcServerReflection {
reflection.Register(grpcServer)
}
return grpcServer
}
34 changes: 20 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@ go 1.17
require (
github.com/Selvatico/go-mocket v1.0.7
github.com/flyteorg/flyteidl v0.22.1
github.com/flyteorg/flytestdlib v0.4.7
github.com/flyteorg/flytestdlib v0.4.17
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.3
github.com/jackc/pgconn v1.8.1
github.com/jackc/pgconn v1.10.1
github.com/mitchellh/mapstructure v1.4.1
github.com/satori/go.uuid v1.2.0
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
google.golang.org/grpc v1.36.0
gorm.io/driver/postgres v1.1.0
gorm.io/gorm v1.21.9
gorm.io/driver/postgres v1.2.3
gorm.io/driver/sqlite v1.1.1
gorm.io/gorm v1.22.4
)

require (
cloud.google.com/go v0.75.0 // indirect
cloud.google.com/go/storage v1.12.0 // indirect
github.com/Azure/azure-sdk-for-go v51.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go v62.3.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.17 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.10 // indirect
Expand All @@ -36,29 +40,31 @@ require (
github.com/coocood/freecache v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.10.0 // indirect
github.com/flyteorg/stow v0.3.1 // indirect
github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/graymeta/stow v0.2.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.0.6 // indirect
github.com/jackc/pgproto3/v2 v2.2.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgtype v1.7.0 // indirect
github.com/jackc/pgx/v4 v4.11.0 // indirect
github.com/jackc/pgtype v1.9.0 // indirect
github.com/jackc/pgx/v4 v4.14.0 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.2 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/magiconair/properties v1.8.4 // indirect
github.com/mattn/go-colorable v0.1.8 // indirect
github.com/mattn/go-isatty v0.0.12 // indirect
github.com/mattn/go-sqlite3 v1.14.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/ncw/swift v1.0.53 // indirect
github.com/pelletier/go-toml v1.8.1 // indirect
Expand All @@ -76,13 +82,13 @@ require (
github.com/stretchr/objx v0.3.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.22.6 // indirect
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5 // indirect
golang.org/x/mod v0.4.1 // indirect
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110 // indirect
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d // indirect
golang.org/x/oauth2 v0.0.0-20210126194326-f9ce19ea3013 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 // indirect
golang.org/x/tools v0.1.0 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
Expand Down
Loading

0 comments on commit b45ed42

Please sign in to comment.