diff --git a/boilerplate/lyft/golang_support_tools/tools.go b/boilerplate/lyft/golang_support_tools/tools.go index 4310b39d7..88ff64523 100644 --- a/boilerplate/lyft/golang_support_tools/tools.go +++ b/boilerplate/lyft/golang_support_tools/tools.go @@ -3,8 +3,8 @@ package tools import ( + _ "github.com/alvaroloes/enumer" _ "github.com/golangci/golangci-lint/cmd/golangci-lint" _ "github.com/lyft/flytestdlib/cli/pflags" _ "github.com/vektra/mockery/cmd/mockery" - _ "github.com/alvaroloes/enumer" ) diff --git a/pkg/async/schedule/aws/workflow_executor.go b/pkg/async/schedule/aws/workflow_executor.go index d1cfe1f00..9934a432e 100644 --- a/pkg/async/schedule/aws/workflow_executor.go +++ b/pkg/async/schedule/aws/workflow_executor.go @@ -242,6 +242,8 @@ func (e *workflowExecutor) Run() { e.metrics.MessageReceivedDelay.Observe(ctx, scheduledWorkflowExecutionRequest.KickoffTime, observedMessageTriggeredTime) } + err := e.subscriber.Err() + logger.Errorf(context.TODO(), "Gizmo subscriber closed channel with err: [%+v]", err) } func (e *workflowExecutor) Stop() error { diff --git a/pkg/rpc/adminservice/base.go b/pkg/rpc/adminservice/base.go index 1ebc1969b..7bd9ee381 100644 --- a/pkg/rpc/adminservice/base.go +++ b/pkg/rpc/adminservice/base.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "runtime/debug" + "time" "github.com/lyft/flyteadmin/pkg/manager/impl/resources" @@ -135,8 +136,19 @@ func NewAdminServer(kubeConfig, master string) *AdminService { scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager) logger.Info(context.Background(), "Successfully initialized a new scheduled workflow executor") go func() { + logger.Info(context.Background(), "Starting the scheduled workflow executor") scheduledWorkflowExecutor.Run() - logger.Info(context.Background(), "Successfully started running the scheduled workflow executor") + + maxReconnectAttempts := configuration.ApplicationConfiguration().GetSchedulerConfig(). + WorkflowExecutorConfig.ReconnectAttempts + reconnectDelay := time.Duration(configuration.ApplicationConfiguration().GetSchedulerConfig(). + WorkflowExecutorConfig.ReconnectDelaySeconds) * time.Second + for reconnectAttempt := 0; reconnectAttempt < maxReconnectAttempts; reconnectAttempt++ { + time.Sleep(reconnectDelay) + logger.Warningf(context.Background(), + "Restarting scheduled workflow executor, attempt %d of %d", reconnectAttempt, maxReconnectAttempts) + scheduledWorkflowExecutor.Run() + } }() // Serve profiling endpoints. diff --git a/pkg/runtime/interfaces/application_configuration.go b/pkg/runtime/interfaces/application_configuration.go index b67ff1cc5..b1dfaf695 100644 --- a/pkg/runtime/interfaces/application_configuration.go +++ b/pkg/runtime/interfaces/application_configuration.go @@ -74,6 +74,10 @@ type WorkflowExecutorConfig struct { // The account id (according to whichever cloud provider scheme is used) that has permission to read from the above // queue. AccountID string `json:"accountId"` + // Specifies the number of times to attempt recreating a workflow executor client should there be any disruptions. + ReconnectAttempts int `json:"reconnectAttempts"` + // Specifies the time interval to wait before attempting to reconnect the workflow executor client. + ReconnectDelaySeconds int `json:"reconnectDelaySeconds"` } // This configuration is the base configuration for all scheduler-related set-up.