From 473c5241936127b547a3cc7e54c4ae24209b2f1f Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Tue, 27 Mar 2018 17:01:54 -0700 Subject: [PATCH] Add retry in some frontend API (#631) --- common/util.go | 18 ++++++++++++ service/frontend/handler.go | 53 ++++++++++++++++++++++++++---------- service/history/timerGate.go | 2 +- 3 files changed, 57 insertions(+), 16 deletions(-) diff --git a/common/util.go b/common/util.go index c929bc20214..f596d9a63ae 100644 --- a/common/util.go +++ b/common/util.go @@ -42,6 +42,10 @@ const ( historyServiceOperationInitialInterval = 50 * time.Millisecond historyServiceOperationMaxInterval = 10 * time.Second historyServiceOperationExpirationInterval = 30 * time.Second + + frontendServiceOperationInitialInterval = 200 * time.Millisecond + frontendServiceOperationMaxInterval = 5 * time.Second + frontendServiceOperationExpirationInterval = 15 * time.Second ) // MergeDictoRight copies the contents of src to dest @@ -103,6 +107,15 @@ func CreateHistoryServiceRetryPolicy() backoff.RetryPolicy { return policy } +// CreateFrontendServiceRetryPolicy creates a retry policy for calls to frontend service +func CreateFrontendServiceRetryPolicy() backoff.RetryPolicy { + policy := backoff.NewExponentialRetryPolicy(frontendServiceOperationInitialInterval) + policy.SetMaximumInterval(frontendServiceOperationMaxInterval) + policy.SetExpirationInterval(frontendServiceOperationExpirationInterval) + + return policy +} + // IsPersistenceTransientError checks if the error is a transient persistence error func IsPersistenceTransientError(err error) bool { switch err.(type) { @@ -113,6 +126,11 @@ func IsPersistenceTransientError(err error) bool { return false } +// IsServiceTransientError checks if the error is a retryable error. +func IsServiceTransientError(err error) bool { + return !IsServiceNonRetryableError(err) +} + // IsServiceNonRetryableError checks if the error is a non retryable error. func IsServiceNonRetryableError(err error) bool { switch err.(type) { diff --git a/service/frontend/handler.go b/service/frontend/handler.go index bc037b91ade..7d874554532 100644 --- a/service/frontend/handler.go +++ b/service/frontend/handler.go @@ -42,6 +42,7 @@ import ( "github.com/uber/cadence/client/history" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/client" "github.com/uber/cadence/common/logging" @@ -105,6 +106,8 @@ var ( errCannotRemoveClustersFromDomain = &gen.BadRequestError{Message: "Cannot remove existing replicated clusters from a domain."} errActiveClusterNotInClusters = &gen.BadRequestError{Message: "Active cluster is not contained in all clusters."} errCannotDoDomainFailoverAndUpdate = &gen.BadRequestError{Message: "Cannot set active cluster to current cluster when other paramaters are set."} + + frontendServiceRetryPolicy = common.CreateFrontendServiceRetryPolicy() ) // NewWorkflowHandler creates a thrift handler for the cadence service @@ -534,11 +537,18 @@ func (wh *WorkflowHandler) PollForActivityTask( } pollerID := uuid.New() - resp, err := wh.matching.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{ - DomainUUID: common.StringPtr(domainID), - PollerID: common.StringPtr(pollerID), - PollRequest: pollRequest, - }) + var resp *gen.PollForActivityTaskResponse + op := func() error { + var err error + resp, err = wh.matching.PollForActivityTask(ctx, &m.PollForActivityTaskRequest{ + DomainUUID: common.StringPtr(domainID), + PollerID: common.StringPtr(pollerID), + PollRequest: pollRequest, + }) + return err + } + + err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError) if err != nil { err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeActivity, pollRequest.TaskList, pollerID) if err != nil { @@ -582,11 +592,18 @@ func (wh *WorkflowHandler) PollForDecisionTask( wh.Service.GetLogger().Debugf("Poll for decision. DomainName: %v, DomainID: %v", domainName, domainID) pollerID := uuid.New() - matchingResp, err := wh.matching.PollForDecisionTask(ctx, &m.PollForDecisionTaskRequest{ - DomainUUID: common.StringPtr(domainID), - PollerID: common.StringPtr(pollerID), - PollRequest: pollRequest, - }) + var matchingResp *m.PollForDecisionTaskResponse + op := func() error { + var err error + matchingResp, err = wh.matching.PollForDecisionTask(ctx, &m.PollForDecisionTaskRequest{ + DomainUUID: common.StringPtr(domainID), + PollerID: common.StringPtr(pollerID), + PollRequest: pollRequest, + }) + return err + } + + err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError) if err != nil { err = wh.cancelOutstandingPoll(ctx, err, domainID, persistence.TaskListTypeDecision, pollRequest.TaskList, pollerID) if err != nil { @@ -619,7 +636,7 @@ func (wh *WorkflowHandler) cancelOutstandingPoll(ctx context.Context, err error, TaskList: taskList, PollerID: common.StringPtr(pollerID), }) - // We can do much if this call fails. Just log the error and move on + // We can not do much if this call fails. Just log the error and move on if err != nil { wh.Service.GetLogger().Warnf("Failed to cancel outstanding poller. Tasklist: %v, Error: %v,", taskList.GetName(), err) @@ -1769,11 +1786,17 @@ func (wh *WorkflowHandler) DescribeTaskList(ctx context.Context, request *gen.De return nil, err } - response, err := wh.matching.DescribeTaskList(ctx, &m.DescribeTaskListRequest{ - DomainUUID: common.StringPtr(domainID), - DescRequest: request, - }) + var response *gen.DescribeTaskListResponse + op := func() error { + var err error + response, err = wh.matching.DescribeTaskList(ctx, &m.DescribeTaskListRequest{ + DomainUUID: common.StringPtr(domainID), + DescRequest: request, + }) + return err + } + err = backoff.Retry(op, frontendServiceRetryPolicy, common.IsServiceTransientError) if err != nil { return nil, wh.error(err, scope) } diff --git a/service/history/timerGate.go b/service/history/timerGate.go index 6b044c6c66f..981443ff6f8 100644 --- a/service/history/timerGate.go +++ b/service/history/timerGate.go @@ -40,7 +40,7 @@ type ( } // TimerGateImpl is an timer implementation, - // which basically is an wrrapper of golang's timer and + // which basically is an wrapper of golang's timer and // additional feature TimerGateImpl struct { // the channel which will be used to proxy the fired timer