Skip to content

Commit

Permalink
add tests for getRawHistory (cadence-workflow#3469)
Browse files Browse the repository at this point in the history
* add tests for getRawHistory
  • Loading branch information
mkolodezny committed Aug 24, 2020
1 parent c17aa76 commit 37d34ca
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 11 deletions.
114 changes: 114 additions & 0 deletions common/client/versionChecker_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions service/frontend/dcRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/client"

"github.com/uber/cadence/.gen/go/cadence/workflowservicetest"
"github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -91,8 +92,8 @@ func (s *dcRedirectionHandlerSuite) SetupTest() {
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(s.currentClusterName).AnyTimes()
s.mockClusterMetadata.EXPECT().IsGlobalDomainEnabled().Return(true).AnyTimes()

s.config = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNopClient(), s.mockResource.GetLogger()), 0, false)
frontendHandler := NewWorkflowHandler(s.mockResource, s.config, nil)
s.config = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNopClient(), s.mockResource.GetLogger()), 0, false, false)
frontendHandler := NewWorkflowHandler(s.mockResource, s.config, nil, client.NewVersionChecker())

s.mockFrontendHandler = NewMockHandler(s.controller)
s.handler = NewDCRedirectionHandler(frontendHandler, config.DCRedirectionPolicy{})
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/dcRedirectionPolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
logger, err := loggerimpl.NewDevelopment()
s.Nil(err)

s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNopClient(), logger), 0, false)
s.mockConfig = NewConfig(dynamicconfig.NewCollection(dynamicconfig.NewNopClient(), logger), 0, false, false)
s.mockClusterMetadata = &mocks.ClusterMetadata{}
s.mockClusterMetadata.On("IsGlobalDomainEnabled").Return(true)
s.policy = NewSelectedAPIsForwardingPolicy(
Expand Down
10 changes: 6 additions & 4 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/uber/cadence/common/client"

"github.com/stretchr/testify/mock"

"github.com/uber/cadence/common"
Expand Down Expand Up @@ -99,7 +101,7 @@ type Config struct {
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFromES bool) *Config {
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFromES bool, sendRawWorkflowHistory bool) *Config {
return &Config{
NumHistoryShards: numHistoryShards,
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.FrontendPersistenceMaxQPS, 2000),
Expand Down Expand Up @@ -137,7 +139,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableReadFro
MinRetentionDays: dc.GetIntProperty(dynamicconfig.MinRetentionDays, domain.MinRetentionDays),
VisibilityArchivalQueryMaxPageSize: dc.GetIntProperty(dynamicconfig.VisibilityArchivalQueryMaxPageSize, 10000),
DisallowQuery: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.DisallowQuery, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, false),
SendRawWorkflowHistory: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.SendRawWorkflowHistory, sendRawWorkflowHistory),
}
}

Expand All @@ -159,7 +161,7 @@ func NewService(
) (resource.Resource, error) {

isAdvancedVisExistInConfig := len(params.PersistenceConfig.AdvancedVisibilityStore) != 0
serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, isAdvancedVisExistInConfig)
serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, isAdvancedVisExistInConfig, false)

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()
params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{
Expand Down Expand Up @@ -244,7 +246,7 @@ func (s *Service) Start() {
replicationMessageSink.(*mocks.KafkaProducer).On("Publish", mock.Anything).Return(nil)
}

wfHandler := NewWorkflowHandler(s, s.config, replicationMessageSink)
wfHandler := NewWorkflowHandler(s, s.config, replicationMessageSink, client.NewVersionChecker())
s.handler = NewDCRedirectionHandler(wfHandler, s.params.DCRedirectionPolicy)
if s.params.Authorizer != nil {
s.handler = NewAccessControlledHandlerImpl(s.handler, s.params.Authorizer)
Expand Down
3 changes: 2 additions & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func NewWorkflowHandler(
resource resource.Resource,
config *Config,
replicationMessageSink messaging.Producer,
versionChecker client.VersionChecker,
) Handler {
return &WorkflowHandler{
Resource: resource,
Expand All @@ -173,7 +174,7 @@ func NewWorkflowHandler(
return float64(config.MaxDomainRPSPerInstance(domain))
},
),
versionChecker: client.NewVersionChecker(),
versionChecker: versionChecker,
domainHandler: domain.NewHandler(
config.MinRetentionDays(),
config.MaxBadBinaries,
Expand Down
79 changes: 76 additions & 3 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package frontend

import (
"context"
"encoding/json"
"errors"
"testing"
"time"
Expand All @@ -31,9 +32,11 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/cadence/common/client"

"github.com/uber/cadence/.gen/go/history/historyservicetest"
"github.com/uber/cadence/.gen/go/shared"
gen "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/archiver"
"github.com/uber/cadence/common/archiver/provider"
Expand All @@ -52,7 +55,7 @@ const (
numHistoryShards = 10

testWorkflowID = "test-workflow-id"
testRunID = "test-run-id"
testRunID = "2c8b555f-1f55-4955-9d1c-b980194555c9"
testHistoryArchivalURI = "testScheme://history/URI"
testVisibilityArchivalURI = "testScheme://visibility/URI"
)
Expand All @@ -77,6 +80,7 @@ type (
mockArchiverProvider *provider.MockArchiverProvider
mockHistoryArchiver *archiver.HistoryArchiverMock
mockVisibilityArchiver *archiver.VisibilityArchiverMock
mockVersionChecker *client.VersionCheckerMock

testDomain string
testDomainID string
Expand Down Expand Up @@ -115,9 +119,11 @@ func (s *workflowHandlerSuite) SetupTest() {
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)
s.mockHistoryArchiver = &archiver.HistoryArchiverMock{}
s.mockVisibilityArchiver = &archiver.VisibilityArchiverMock{}
s.mockVersionChecker = client.NewMockVersionChecker(s.controller)

mockMonitor := s.mockResource.MembershipMonitor
mockMonitor.EXPECT().GetMemberCount(common.FrontendServiceName).Return(5, nil).AnyTimes()
s.mockVersionChecker.EXPECT().ClientSupported(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()

}

Expand All @@ -130,7 +136,7 @@ func (s *workflowHandlerSuite) TearDownTest() {
}

func (s *workflowHandlerSuite) getWorkflowHandler(config *Config) *WorkflowHandler {
return NewWorkflowHandler(s.mockResource, config, s.mockProducer).(*WorkflowHandler)
return NewWorkflowHandler(s.mockResource, config, s.mockProducer, s.mockVersionChecker).(*WorkflowHandler)
}

func (s *workflowHandlerSuite) TestDisableListVisibilityByFilter() {
Expand Down Expand Up @@ -1096,6 +1102,73 @@ func (s *workflowHandlerSuite) TestGetSearchAttributes() {
s.NotNil(resp)
}

func (s *workflowHandlerSuite) TestGetWorkflowExecutionHistory__Success__RawHistoryEnabledTransientDecisionEmitted() {
var nextEventID int64 = 5
s.getWorkflowExecutionHistory(5, &shared.TransientDecisionInfo{
StartedEvent: &shared.HistoryEvent{EventId: common.Int64Ptr(nextEventID + 1)},
ScheduledEvent: &shared.HistoryEvent{EventId: common.Int64Ptr(nextEventID)},
}, []*shared.HistoryEvent{{}, {}, {}})
}

func (s *workflowHandlerSuite) TestGetWorkflowExecutionHistory__Success__RawHistoryEnabledNoTransientDecisionEmitted() {
var nextEventID int64 = 5
s.getWorkflowExecutionHistory(5, &shared.TransientDecisionInfo{
StartedEvent: &shared.HistoryEvent{EventId: common.Int64Ptr(nextEventID + 1)},
ScheduledEvent: &shared.HistoryEvent{EventId: common.Int64Ptr(nextEventID)},
}, []*shared.HistoryEvent{{}, {}, {}})
}

func (s *workflowHandlerSuite) getWorkflowExecutionHistory(nextEventID int64, transientDecision *gen.TransientDecisionInfo, historyEvents []*shared.HistoryEvent) {
wh := s.getWorkflowHandler(NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false, true))
ctx := context.Background()
s.mockDomainCache.EXPECT().GetDomainID(gomock.Any()).Return(s.testDomainID, nil).AnyTimes()
s.mockVersionChecker.EXPECT().SupportsRawHistoryQuery(gomock.Any(), gomock.Any()).Return(nil).Times(1)
blob, _ := wh.GetPayloadSerializer().SerializeBatchEvents(historyEvents, common.EncodingTypeThriftRW)
s.mockHistoryV2Mgr.On("ReadRawHistoryBranch", mock.Anything).Return(&persistence.ReadRawHistoryBranchResponse{
HistoryEventBlobs: []*persistence.DataBlob{blob},
NextPageToken: []byte{},
}, nil).Once()
token, _ := json.Marshal(&getHistoryContinuationToken{
FirstEventID: 1,
NextEventID: nextEventID,
RunID: testRunID,
TransientDecision: transientDecision,
})
resp, err := wh.GetWorkflowExecutionHistory(ctx, &shared.GetWorkflowExecutionHistoryRequest{
Domain: common.StringPtr(s.testDomain),
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(testWorkflowID),
RunId: common.StringPtr(testRunID),
},
SkipArchival: common.BoolPtr(true),
NextPageToken: token,
})
s.NoError(err)
s.NotNil(resp)
s.NotNil(resp.RawHistory)
s.Equal(2, len(resp.RawHistory))

events := deserializeBlobDataToHistoryEvents(wh, resp.RawHistory)
s.NotNil(events)
if transientDecision != nil {
s.Equal(len(historyEvents)+2, len(events))
} else {
s.Equal(len(historyEvents), len(events))
}
}

func deserializeBlobDataToHistoryEvents(wh *WorkflowHandler, dataBlobs []*shared.DataBlob) []*shared.HistoryEvent {
var historyEvents []*shared.HistoryEvent
for _, batch := range dataBlobs {
events, err := wh.GetPayloadSerializer().DeserializeBatchEvents(&persistence.DataBlob{Data: batch.Data, Encoding: common.EncodingTypeThriftRW})
if err != nil {
return nil
}
historyEvents = append(historyEvents, events...)
}
return historyEvents
}

func (s *workflowHandlerSuite) TestListWorkflowExecutions() {
config := s.newConfig()
wh := s.getWorkflowHandler(config)
Expand Down Expand Up @@ -1279,7 +1352,7 @@ func (s *workflowHandlerSuite) TestVerifyHistoryIsComplete() {
}

func (s *workflowHandlerSuite) newConfig() *Config {
return NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false)
return NewConfig(dc.NewCollection(dc.NewNopClient(), s.mockResource.GetLogger()), numHistoryShards, false, false)
}

func updateRequest(
Expand Down

0 comments on commit 37d34ca

Please sign in to comment.