Skip to content

Commit

Permalink
Fix ES client config NPE (#3794)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Nov 24, 2020
1 parent a64c8c5 commit a201ed7
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/server/cadence/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *server) startService() common.Daemon {

params.ESConfig = advancedVisStore.ElasticSearch
params.ESConfig.SetUsernamePassword()
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, s.cfg.Persistence.VisibilityConfig, params.Logger)
esClient, err := elasticsearch.NewGenericClient(params.ESConfig, params.Logger)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion common/domain/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)

Expand Down
18 changes: 10 additions & 8 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type (
// elasticV6 implements Client
elasticV6 struct {
client *elastic.Client
config *config.VisibilityConfig
logger log.Logger
serializer p.PayloadSerializer
}
Expand Down Expand Up @@ -86,7 +85,6 @@ func (c *elasticV6) IsNotFoundError(err error) bool {
// NewV6Client returns a new implementation of GenericClient
func NewV6Client(
connectConfig *config.ElasticSearchConfig,
visibilityConfig *config.VisibilityConfig,
logger log.Logger,
clientOptFuncs ...elastic.ClientOptionFunc,
) (GenericClient, error) {
Expand All @@ -105,7 +103,6 @@ func NewV6Client(

return &elasticV6{
client: client,
config: visibilityConfig,
logger: logger,
serializer: p.NewPayloadSerializer(),
}, nil
Expand Down Expand Up @@ -152,7 +149,7 @@ func (c *elasticV6) Search(ctx context.Context, request *SearchRequest) (*p.Inte
return nil, err
}

return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.ListRequest.PageSize, request.Filter)
return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.ListRequest.PageSize, request.MaxResultWindow, request.Filter)
}

func (c *elasticV6) SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
Expand All @@ -166,7 +163,7 @@ func (c *elasticV6) SearchByQuery(ctx context.Context, request *SearchByQueryReq
return nil, err
}

return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.PageSize, request.Filter)
return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.PageSize, request.MaxResultWindow, request.Filter)
}

func (c *elasticV6) ScanByQuery(ctx context.Context, request *ScanByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
Expand Down Expand Up @@ -499,8 +496,13 @@ func buildPutMappingBodyV6(root, key, valueType string) map[string]interface{} {
return body
}

func (c *elasticV6) getListWorkflowExecutionsResponse(searchHits *elastic.SearchHits,
token *ElasticVisibilityPageToken, pageSize int, isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool) (*p.InternalListWorkflowExecutionsResponse, error) {
func (c *elasticV6) getListWorkflowExecutionsResponse(
searchHits *elastic.SearchHits,
token *ElasticVisibilityPageToken,
pageSize int,
maxResultWindow int,
isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool,
) (*p.InternalListWorkflowExecutionsResponse, error) {

response := &p.InternalListWorkflowExecutionsResponse{}
actualHits := searchHits.Hits
Expand All @@ -522,7 +524,7 @@ func (c *elasticV6) getListWorkflowExecutionsResponse(searchHits *elastic.Search

// ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// to retrieve deeper pages, use ES SearchAfter
if searchHits.TotalHits <= int64(c.config.ESIndexMaxResultWindow()-pageSize) { // use ES Search From+Size
if searchHits.TotalHits <= int64(maxResultWindow-pageSize) { // use ES Search From+Size
nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{From: token.From + numOfActualHits})
} else { // use ES Search After
var sortVal interface{}
Expand Down
18 changes: 10 additions & 8 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ type (
// elasticV7 implements Client
elasticV7 struct {
client *elastic.Client
config *config.VisibilityConfig
logger log.Logger
serializer p.PayloadSerializer
}
Expand Down Expand Up @@ -79,7 +78,6 @@ type (
// NewV7Client returns a new implementation of GenericClient
func NewV7Client(
connectConfig *config.ElasticSearchConfig,
visibilityConfig *config.VisibilityConfig,
logger log.Logger,
clientOptFuncs ...elastic.ClientOptionFunc,
) (GenericClient, error) {
Expand All @@ -98,7 +96,6 @@ func NewV7Client(

return &elasticV7{
client: client,
config: visibilityConfig,
logger: logger,
serializer: p.NewPayloadSerializer(),
}, nil
Expand Down Expand Up @@ -152,7 +149,7 @@ func (c *elasticV7) Search(ctx context.Context, request *SearchRequest) (*p.Inte
return nil, err
}

return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.ListRequest.PageSize, request.Filter)
return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.ListRequest.PageSize, request.MaxResultWindow, request.Filter)
}

func (c *elasticV7) SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
Expand All @@ -166,7 +163,7 @@ func (c *elasticV7) SearchByQuery(ctx context.Context, request *SearchByQueryReq
return nil, err
}

return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.PageSize, request.Filter)
return c.getListWorkflowExecutionsResponse(searchResult.Hits, token, request.PageSize, request.MaxResultWindow, request.Filter)
}

func (c *elasticV7) ScanByQuery(ctx context.Context, request *ScanByQueryRequest) (*p.InternalListWorkflowExecutionsResponse, error) {
Expand Down Expand Up @@ -499,8 +496,13 @@ func buildPutMappingBodyV7(root, key, valueType string) map[string]interface{} {
return body
}

func (c *elasticV7) getListWorkflowExecutionsResponse(searchHits *elastic.SearchHits,
token *ElasticVisibilityPageToken, pageSize int, isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool) (*p.InternalListWorkflowExecutionsResponse, error) {
func (c *elasticV7) getListWorkflowExecutionsResponse(
searchHits *elastic.SearchHits,
token *ElasticVisibilityPageToken,
pageSize int,
maxResultWindow int,
isRecordValid func(rec *p.InternalVisibilityWorkflowExecutionInfo) bool,
) (*p.InternalListWorkflowExecutionsResponse, error) {

response := &p.InternalListWorkflowExecutionsResponse{}
actualHits := searchHits.Hits
Expand All @@ -522,7 +524,7 @@ func (c *elasticV7) getListWorkflowExecutionsResponse(searchHits *elastic.Search

// ES Search API support pagination using From and PageSize, but has limit that From+PageSize cannot exceed a threshold
// to retrieve deeper pages, use ES SearchAfter
if searchHits.TotalHits.Value <= int64(c.config.ESIndexMaxResultWindow()-pageSize) { // use ES Search From+Size
if searchHits.TotalHits.Value <= int64(maxResultWindow-pageSize) { // use ES Search From+Size
nextPageToken, err = SerializePageToken(&ElasticVisibilityPageToken{From: token.From + numOfActualHits})
} else { // use ES Search After
var sortVal interface{}
Expand Down
27 changes: 14 additions & 13 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,16 @@ import (
// NewGenericClient create a ES client
func NewGenericClient(
connectConfig *config.ElasticSearchConfig,
visibilityConfig *config.VisibilityConfig,
logger log.Logger,
) (GenericClient, error) {
if connectConfig.Version == "" {
connectConfig.Version = "v6"
}
switch connectConfig.Version {
case "v6":
return NewV6Client(connectConfig, visibilityConfig, logger)
return NewV6Client(connectConfig, logger)
case "v7":
return NewV7Client(connectConfig, visibilityConfig, logger)
return NewV7Client(connectConfig, logger)
default:
return nil, fmt.Errorf("not supported ElasticSearch version: %v", connectConfig.Version)
}
Expand Down Expand Up @@ -80,11 +79,12 @@ type (

// SearchRequest is request for Search
SearchRequest struct {
Index string
ListRequest *p.InternalListWorkflowExecutionsRequest
IsOpen bool
Filter IsRecordValidFilter
MatchQuery *GenericMatch
Index string
ListRequest *p.InternalListWorkflowExecutionsRequest
IsOpen bool
Filter IsRecordValidFilter
MatchQuery *GenericMatch
MaxResultWindow int
}

// GenericMatch is a match struct
Expand All @@ -95,11 +95,12 @@ type (

// SearchByQueryRequest is request for SearchByQuery
SearchByQueryRequest struct {
Index string
Query string
NextPageToken []byte
PageSize int
Filter IsRecordValidFilter
Index string
Query string
NextPageToken []byte
PageSize int
Filter IsRecordValidFilter
MaxResultWindow int
}

// ScanByQueryRequest is request for SearchByQuery
Expand Down
38 changes: 23 additions & 15 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,11 +162,12 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutions(
}

resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: request,
IsOpen: true,
Filter: isRecordValid,
MatchQuery: nil,
Index: v.index,
ListRequest: request,
IsOpen: true,
Filter: isRecordValid,
MatchQuery: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -185,11 +186,12 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutions(
}

resp, err := v.esClient.Search(ctx, &es.SearchRequest{
Index: v.index,
ListRequest: request,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: nil,
Index: v.index,
ListRequest: request,
IsOpen: false,
Filter: isRecordValid,
MatchQuery: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -216,6 +218,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByType(
Name: es.WorkflowType,
Text: request.WorkflowTypeName,
},
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -242,6 +245,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByType(
Name: es.WorkflowType,
Text: request.WorkflowTypeName,
},
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -268,6 +272,7 @@ func (v *esVisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
Name: es.WorkflowID,
Text: request.WorkflowID,
},
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -294,6 +299,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
Name: es.WorkflowID,
Text: request.WorkflowID,
},
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand All @@ -320,6 +326,7 @@ func (v *esVisibilityStore) ListClosedWorkflowExecutionsByStatus(
Name: es.CloseStatus,
Text: int32(*thrift.FromWorkflowExecutionCloseStatus(&request.Status)),
},
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand Down Expand Up @@ -374,11 +381,12 @@ func (v *esVisibilityStore) ListWorkflowExecutions(
}

resp, err := v.esClient.SearchByQuery(ctx, &es.SearchByQueryRequest{
Index: v.index,
Query: queryDSL,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
Filter: nil,
Index: v.index,
Query: queryDSL,
NextPageToken: request.NextPageToken,
PageSize: request.PageSize,
Filter: nil,
MaxResultWindow: v.config.ESIndexMaxResultWindow(),
})
if err != nil {
return nil, &types.InternalServiceError{
Expand Down
12 changes: 11 additions & 1 deletion common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ var (
filterByWID = fmt.Sprintf("map[match:map[WorkflowID:map[query:%s]]]", testWorkflowID)
filterByRunID = fmt.Sprintf("map[match:map[RunID:map[query:%s]]]", testRunID)
filterByStatus = fmt.Sprintf("map[match:map[CloseStatus:map[query:%v]]]", testCloseStatus)

esIndexMaxResultWindow = 3
)

func TestESVisibilitySuite(t *testing.T) {
Expand All @@ -101,7 +103,7 @@ func (s *ESVisibilitySuite) SetupTest() {

s.mockESClient = &esMocks.GenericClient{}
config := &config.VisibilityConfig{
ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(3),
ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(esIndexMaxResultWindow),
ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()),
}

Expand Down Expand Up @@ -234,6 +236,7 @@ func (s *ESVisibilitySuite) TestRecordWorkflowExecutionClosed_EmptyRequest() {
func (s *ESVisibilitySuite) TestListOpenWorkflowExecutions() {
s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool {
s.True(input.IsOpen)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand All @@ -254,6 +257,7 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutions() {
func (s *ESVisibilitySuite) TestListClosedWorkflowExecutions() {
s.mockESClient.On("Search", mock.Anything, mock.MatchedBy(func(input *es.SearchRequest) bool {
s.False(input.IsOpen)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand All @@ -276,6 +280,7 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByType() {
s.True(input.IsOpen)
s.Equal(es.WorkflowType, input.MatchQuery.Name)
s.Equal(testWorkflowType, input.MatchQuery.Text)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down Expand Up @@ -303,6 +308,7 @@ func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByType() {
s.False(input.IsOpen)
s.Equal(es.WorkflowType, input.MatchQuery.Name)
s.Equal(testWorkflowType, input.MatchQuery.Text)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down Expand Up @@ -330,6 +336,7 @@ func (s *ESVisibilitySuite) TestListOpenWorkflowExecutionsByWorkflowID() {
s.True(input.IsOpen)
s.Equal(es.WorkflowID, input.MatchQuery.Name)
s.Equal(testWorkflowID, input.MatchQuery.Text)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down Expand Up @@ -357,6 +364,7 @@ func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByWorkflowID() {
s.False(input.IsOpen)
s.Equal(es.WorkflowID, input.MatchQuery.Name)
s.Equal(testWorkflowID, input.MatchQuery.Text)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down Expand Up @@ -384,6 +392,7 @@ func (s *ESVisibilitySuite) TestListClosedWorkflowExecutionsByStatus() {
s.False(input.IsOpen)
s.Equal(es.CloseStatus, input.MatchQuery.Name)
s.Equal(testCloseStatus, input.MatchQuery.Text)
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down Expand Up @@ -705,6 +714,7 @@ func (s *ESVisibilitySuite) TestAddDomainToQuery() {
func (s *ESVisibilitySuite) TestListWorkflowExecutions() {
s.mockESClient.On("SearchByQuery", mock.Anything, mock.MatchedBy(func(input *es.SearchByQueryRequest) bool {
s.True(strings.Contains(input.Query, `{"match_phrase":{"CloseStatus":{"query":"5"}}}`))
s.Equal(esIndexMaxResultWindow, input.MaxResultWindow)
return true
})).Return(testSearchResult, nil).Once()

Expand Down
2 changes: 1 addition & 1 deletion host/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func NewCluster(options *TestClusterConfig, logger log.Logger) (*TestCluster, er
ESIndexMaxResultWindow: dynamicconfig.GetIntPropertyFn(defaultTestValueOfESIndexMaxResultWindow),
ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()),
}
esClient, err = elasticsearch.NewGenericClient(options.ESConfig, visConfig, logger)
esClient, err = elasticsearch.NewGenericClient(options.ESConfig, logger)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit a201ed7

Please sign in to comment.