From dccf8883c532b9f1ab8a66ee7659f4cbae800503 Mon Sep 17 00:00:00 2001 From: arthurguber <38018478+arthurguber@users.noreply.github.com> Date: Thu, 7 Jun 2018 18:46:13 -0700 Subject: [PATCH] Add ActivityScheduleTimeout deduction logic (#822) This commit relaxes validateActivityScheduleAttributes by having it fill in some of the fields related to schedule timeouts if they are empty. The logic used is identical to the logic used in AWS SWF's java client. The logic is as follows: If ScheduleToClose is present and ScheduleToStart or StartToClose is missing, we will set what is missing to ScheduleToClose. If ScheduleToClose is missing but both ScheduleToStart and StartToClose are present, we will set ScheduleToClose to the sum of ScheduleToStart and StartToClose. --- service/history/historyEngine.go | 36 +++++--- service/history/historyEngine_test.go | 124 ++++++++++++++++++++++++++ 2 files changed, 148 insertions(+), 12 deletions(-) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 90646c0ad34..2585e214a9d 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -2373,18 +2373,6 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas return &workflow.BadRequestError{Message: "ActivityType is not set on decision."} } - if attributes.StartToCloseTimeoutSeconds == nil || *attributes.StartToCloseTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid StartToCloseTimeoutSeconds is not set on decision."} - } - if attributes.ScheduleToStartTimeoutSeconds == nil || *attributes.ScheduleToStartTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid ScheduleToStartTimeoutSeconds is not set on decision."} - } - if attributes.ScheduleToCloseTimeoutSeconds == nil || *attributes.ScheduleToCloseTimeoutSeconds <= 0 { - return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeoutSeconds is not set on decision."} - } - if attributes.HeartbeatTimeoutSeconds == nil || *attributes.HeartbeatTimeoutSeconds < 0 { - return &workflow.BadRequestError{Message: "A valid HeartbeatTimeoutSeconds is not set on decision."} - } if policy := attributes.RetryPolicy; policy != nil { if policy.GetInitialIntervalInSeconds() <= 0 { return &workflow.BadRequestError{Message: "A valid InitialIntervalInSeconds is not set on retry policy."} @@ -2397,6 +2385,30 @@ func validateActivityScheduleAttributes(attributes *workflow.ScheduleActivityTas } } + // Only attempt to deduce and fill in unspecified timeouts only when all timeouts are non-negative. + if attributes.GetScheduleToCloseTimeoutSeconds() < 0 || attributes.GetScheduleToStartTimeoutSeconds() < 0 || + attributes.GetStartToCloseTimeoutSeconds() < 0 || attributes.GetHeartbeatTimeoutSeconds() < 0 { + return &workflow.BadRequestError{Message: "A valid timeout may not be negative."} + } + + validScheduleToClose := attributes.GetScheduleToCloseTimeoutSeconds() > 0 + validScheduleToStart := attributes.GetScheduleToStartTimeoutSeconds() > 0 + validStartToClose := attributes.GetStartToCloseTimeoutSeconds() > 0 + + if validScheduleToClose { + if !validScheduleToStart { + attributes.ScheduleToStartTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds()) + } + if !validStartToClose { + attributes.StartToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToCloseTimeoutSeconds()) + } + } else if validScheduleToStart && validStartToClose { + attributes.ScheduleToCloseTimeoutSeconds = common.Int32Ptr(attributes.GetScheduleToStartTimeoutSeconds() + attributes.GetStartToCloseTimeoutSeconds()) + } else { + // Deduction failed as there's not enough information to fill in missing timeouts. + return &workflow.BadRequestError{Message: "A valid ScheduleToCloseTimeout is not set on decision."} + } + return nil } diff --git a/service/history/historyEngine_test.go b/service/history/historyEngine_test.go index b8a350459d0..b6520dcba05 100644 --- a/service/history/historyEngine_test.go +++ b/service/history/historyEngine_test.go @@ -1058,6 +1058,130 @@ func (s *engineSuite) TestRespondDecisionTaskCompletedBadDecisionAttributes() { s.IsType(&workflow.BadRequestError{}, err) } +// This test unit tests the activity schedule timeout validation logic of HistoryEngine's RespondDecisionTaskComplete function. +// An scheduled activity decision has 3 timeouts: ScheduleToClose, ScheduleToStart and StartToClose. +// This test verifies that when either ScheduleToClose or ScheduleToStart and StartToClose are specified, +// HistoryEngine's validateActivityScheduleAttribute will deduce the missing timeout and fill it in +// instead of returning a BadRequest error and only when all three are missing should a BadRequest be returned. +func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledAttribute() { + testIterationVariables := []struct { + scheduleToClose *int32 + scheduleToStart *int32 + startToClose *int32 + heartbeat *int32 + expectedScheduleToClose int32 + expectedScheduleToStart int32 + expectedStartToClose int32 + expectError bool + }{ + // No ScheduleToClose timeout, will use ScheduleToStart + StartToClose + {nil, common.Int32Ptr(3), common.Int32Ptr(7), nil, + 3 + 7, 3, 7, false}, + // Has ScheduleToClose timeout but not ScheduleToStart or StartToClose, + // will use ScheduleToClose for ScheduleToStart and StartToClose + {common.Int32Ptr(7), nil, nil, nil, + 7, 7, 7, false}, + // No ScheduleToClose timeout, ScheduleToStart or StartToClose, expect error return + {nil, nil, nil, nil, + 0, 0, 0, true}, + // Negative ScheduleToClose, expect error return + {common.Int32Ptr(-1), nil, nil, nil, + 0, 0, 0, true}, + // Negative ScheduleToStart, expect error return + {nil, common.Int32Ptr(-1), nil, nil, + 0, 0, 0, true}, + // Negative StartToClose, expect error return + {nil, nil, common.Int32Ptr(-1), nil, + 0, 0, 0, true}, + // Negative HeartBeat, expect error return + {nil, nil, nil, common.Int32Ptr(-1), + 0, 0, 0, true}, + } + + for _, iVar := range testIterationVariables { + domainID := validDomainID + we := workflow.WorkflowExecution{ + WorkflowId: common.StringPtr("wId"), + RunId: common.StringPtr(validRunID), + } + tl := "testTaskList" + taskToken, _ := json.Marshal(&common.TaskToken{ + WorkflowID: "wId", + RunID: we.GetRunId(), + ScheduleID: 2, + }) + identity := "testIdentity" + executionContext := []byte("context") + input := []byte("input") + + msBuilder := newMutableStateBuilder(s.config, bark.NewLoggerFromLogrus(log.New())) + addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, []byte("input"), 100, 200, identity) + di := addDecisionTaskScheduledEvent(msBuilder) + addDecisionTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity) + + decisions := []*workflow.Decision{{ + DecisionType: common.DecisionTypePtr(workflow.DecisionTypeScheduleActivityTask), + ScheduleActivityTaskDecisionAttributes: &workflow.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("activity1"), + ActivityType: &workflow.ActivityType{Name: common.StringPtr("activity_type1")}, + TaskList: &workflow.TaskList{Name: &tl}, + Input: input, + ScheduleToCloseTimeoutSeconds: iVar.scheduleToClose, + ScheduleToStartTimeoutSeconds: iVar.scheduleToStart, + StartToCloseTimeoutSeconds: iVar.startToClose, + HeartbeatTimeoutSeconds: iVar.heartbeat, + }, + }} + + ms := createMutableState(msBuilder) + gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms} + + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything).Return(gwmsResponse, nil).Once() + + if !iVar.expectError { + s.mockHistoryMgr.On("AppendHistoryEvents", mock.Anything).Return(nil).Once() + s.mockExecutionMgr.On("UpdateWorkflowExecution", mock.Anything).Return(nil).Once() + } + + s.mockMetadataMgr.On("GetDomain", mock.Anything).Return( + &persistence.GetDomainResponse{ + Info: &persistence.DomainInfo{ID: domainID}, + Config: &persistence.DomainConfig{Retention: 1}, + ReplicationConfig: &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + &persistence.ClusterReplicationConfig{ClusterName: cluster.TestCurrentClusterName}, + }, + }, + TableVersion: persistence.DomainTableVersionV1, + }, + nil, + ) + _, err := s.mockHistoryEngine.RespondDecisionTaskCompleted(context.Background(), &history.RespondDecisionTaskCompletedRequest{ + DomainUUID: common.StringPtr(domainID), + CompleteRequest: &workflow.RespondDecisionTaskCompletedRequest{ + TaskToken: taskToken, + Decisions: decisions, + ExecutionContext: executionContext, + Identity: &identity, + }, + }) + + if !iVar.expectError { + s.Nil(err, s.printHistory(msBuilder)) + executionBuilder := s.getBuilder(domainID, we) + activity1Attributes := s.getActivityScheduledEvent(executionBuilder, int64(5)).ActivityTaskScheduledEventAttributes + s.Equal(iVar.expectedScheduleToClose, activity1Attributes.GetScheduleToCloseTimeoutSeconds()) + s.Equal(iVar.expectedScheduleToStart, activity1Attributes.GetScheduleToStartTimeoutSeconds()) + s.Equal(iVar.expectedStartToClose, activity1Attributes.GetStartToCloseTimeoutSeconds()) + } else { + s.NotNil(err) + } + s.TearDownTest() + s.SetupTest() + } +} + func (s *engineSuite) TestRespondDecisionTaskCompletedSingleActivityScheduledDecision() { domainID := validDomainID we := workflow.WorkflowExecution{