Skip to content

Commit

Permalink
Add debug logs for terminated, scheduled workflow executor (flyteorg#92)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored Apr 17, 2020
1 parent 7d179a7 commit 6a55199
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 2 deletions.
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
_ "github.com/alvaroloes/enumer"
)
2 changes: 2 additions & 0 deletions pkg/async/schedule/aws/workflow_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ func (e *workflowExecutor) Run() {
e.metrics.MessageReceivedDelay.Observe(ctx, scheduledWorkflowExecutionRequest.KickoffTime,
observedMessageTriggeredTime)
}
err := e.subscriber.Err()
logger.Errorf(context.TODO(), "Gizmo subscriber closed channel with err: [%+v]", err)
}

func (e *workflowExecutor) Stop() error {
Expand Down
14 changes: 13 additions & 1 deletion pkg/rpc/adminservice/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"runtime/debug"
"time"

"github.com/lyft/flyteadmin/pkg/manager/impl/resources"

Expand Down Expand Up @@ -135,8 +136,19 @@ func NewAdminServer(kubeConfig, master string) *AdminService {
scheduledWorkflowExecutor := workflowScheduler.GetWorkflowExecutor(executionManager, launchPlanManager)
logger.Info(context.Background(), "Successfully initialized a new scheduled workflow executor")
go func() {
logger.Info(context.Background(), "Starting the scheduled workflow executor")
scheduledWorkflowExecutor.Run()
logger.Info(context.Background(), "Successfully started running the scheduled workflow executor")

maxReconnectAttempts := configuration.ApplicationConfiguration().GetSchedulerConfig().
WorkflowExecutorConfig.ReconnectAttempts
reconnectDelay := time.Duration(configuration.ApplicationConfiguration().GetSchedulerConfig().
WorkflowExecutorConfig.ReconnectDelaySeconds) * time.Second
for reconnectAttempt := 0; reconnectAttempt < maxReconnectAttempts; reconnectAttempt++ {
time.Sleep(reconnectDelay)
logger.Warningf(context.Background(),
"Restarting scheduled workflow executor, attempt %d of %d", reconnectAttempt, maxReconnectAttempts)
scheduledWorkflowExecutor.Run()
}
}()

// Serve profiling endpoints.
Expand Down
4 changes: 4 additions & 0 deletions pkg/runtime/interfaces/application_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type WorkflowExecutorConfig struct {
// The account id (according to whichever cloud provider scheme is used) that has permission to read from the above
// queue.
AccountID string `json:"accountId"`
// Specifies the number of times to attempt recreating a workflow executor client should there be any disruptions.
ReconnectAttempts int `json:"reconnectAttempts"`
// Specifies the time interval to wait before attempting to reconnect the workflow executor client.
ReconnectDelaySeconds int `json:"reconnectDelaySeconds"`
}

// This configuration is the base configuration for all scheduler-related set-up.
Expand Down

0 comments on commit 6a55199

Please sign in to comment.