Skip to content

Commit

Permalink
Added profiler configuration for scheduler and starting it (flyteorg#261
Browse files Browse the repository at this point in the history
)
  • Loading branch information
pmahindrakar-oss authored Sep 23, 2021
1 parent 2a5638b commit 8c2d33c
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 4 deletions.
15 changes: 13 additions & 2 deletions flyteadmin/cmd/scheduler/entrypoints/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ import (
"github.com/flyteorg/flyteadmin/pkg/common"
repositoryCommonConfig "github.com/flyteorg/flyteadmin/pkg/repositories/config"
"github.com/flyteorg/flyteadmin/pkg/runtime"
scheduler "github.com/flyteorg/flyteadmin/scheduler"
"github.com/flyteorg/flyteadmin/scheduler"
schdulerRepoConfig "github.com/flyteorg/flyteadmin/scheduler/repositories"
"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flytestdlib/contextutils"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/profutils"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"

Expand All @@ -27,6 +28,7 @@ var schedulerRunCmd = &cobra.Command{
ctx := context.Background()
configuration := runtime.NewConfigurationProvider()
applicationConfiguration := configuration.ApplicationConfiguration().GetTopLevelConfig()
schedulerConfiguration := configuration.ApplicationConfiguration().GetSchedulerConfig()

// Define the schedulerScope for prometheus metrics
schedulerScope := promutils.NewScope(applicationConfiguration.MetricsScope).NewSubScope("flytescheduler")
Expand Down Expand Up @@ -54,7 +56,16 @@ var schedulerRunCmd = &cobra.Command{
scheduleExecutor := scheduler.NewScheduledExecutor(db,
configuration.ApplicationConfiguration().GetSchedulerConfig().GetWorkflowExecutorConfig(), schedulerScope, adminServiceClient)

logger.Info(context.Background(), "Successfully initialized a native flyte scheduler")
logger.Info(ctx, "Successfully initialized a native flyte scheduler")

// Serve profiling endpoints.
go func() {
err := profutils.StartProfilingServerWithDefaultHandlers(
ctx, schedulerConfiguration.ProfilerPort.Port, nil)
if err != nil {
logger.Panicf(ctx, "Failed to Start profiling and Metrics server. Error, %v", err)
}
}()

err = scheduleExecutor.Run(ctx)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}
signedInputsURLBlob := admin.UrlBlob{}
if nodeExecution.InputUri != "" {
if len(nodeExecution.InputUri) != 0 {
signedInputsURLBlob, err = m.urlData.Get(ctx, nodeExecution.InputUri)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions flyteadmin/pkg/runtime/application_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var flyteAdminConfig = config.MustRegisterSection(flyteAdmin, &interfaces.Applic
AsyncEventsBufferSize: 100,
})
var schedulerConfig = config.MustRegisterSection(scheduler, &interfaces.SchedulerConfig{
ProfilerPort: config.Port{Port: 10253},
EventSchedulerConfig: interfaces.EventSchedulerConfig{
Scheme: common.Local,
FlyteSchedulerConfig: &interfaces.FlyteSchedulerConfig{},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package interfaces

import "golang.org/x/time/rate"
import (
"github.com/flyteorg/flytestdlib/config"
"golang.org/x/time/rate"
)

// This configuration section is used to for initiating the database connection with the store that holds registered
// entities (e.g. workflows, tasks, launch plans...)
Expand Down Expand Up @@ -232,6 +235,8 @@ func (f *AdminRateLimit) GetBurst() int {

// This configuration is the base configuration for all scheduler-related set-up.
type SchedulerConfig struct {
// Determines which port the profiling server used for scheduler monitoring and application debugging uses.
ProfilerPort config.Port `json:"profilerPort"`
EventSchedulerConfig EventSchedulerConfig `json:"eventScheduler"`
WorkflowExecutorConfig WorkflowExecutorConfig `json:"workflowExecutor"`
// Specifies the number of times to attempt recreating a workflow executor client should there be any disruptions.
Expand Down

0 comments on commit 8c2d33c

Please sign in to comment.