Skip to content

Commit

Permalink
Support visibility query with close status represented in string (#3865
Browse files Browse the repository at this point in the history
)
  • Loading branch information
yycptt authored Jan 5, 2021
1 parent eb161e6 commit 6e69fa1
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 32 deletions.
57 changes: 51 additions & 6 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,9 +468,9 @@ const (

var (
timeKeys = map[string]bool{
"StartTime": true,
"CloseTime": true,
"ExecutionTime": true,
es.StartTime: true,
es.CloseTime: true,
es.ExecutionTime: true,
}
rangeKeys = map[string]bool{
"from": true,
Expand Down Expand Up @@ -575,7 +575,7 @@ func getCustomizedDSLFromSQL(sql string, domainID string) (*fastjson.Value, erro
addQueryForExecutionTime(dsl)
}
addDomainToQuery(dsl, domainID)
if err := processAllValuesForKey(dsl, timeKeyFilter, timeProcessFunc); err != nil {
if err := processAllValuesForKey(dsl, isCombinedKey, combinedProcessFunc); err != nil {
return nil, err
}
return dsl, nil
Expand Down Expand Up @@ -782,7 +782,9 @@ func checkPageSize(request *p.ListWorkflowExecutionsByQueryRequest) {
}
}

func processAllValuesForKey(dsl *fastjson.Value, keyFilter func(k string) bool,
func processAllValuesForKey(
dsl *fastjson.Value,
keyFilter func(k string) bool,
processFunc func(obj *fastjson.Object, key string, v *fastjson.Value) error,
) error {
switch dsl.Type() {
Expand Down Expand Up @@ -817,7 +819,23 @@ func processAllValuesForKey(dsl *fastjson.Value, keyFilter func(k string) bool,
return nil
}

func timeKeyFilter(key string) bool {
func isCombinedKey(key string) bool {
return isTimeKey(key) || isCloseStatusKey(key)
}

func combinedProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
if isTimeKey(key) {
return timeProcessFunc(obj, key, value)
}

if isCloseStatusKey(key) {
return closeStatusProcessFunc(obj, key, value)
}

return fmt.Errorf("unknown es dsl key %v for processing value", key)
}

func isTimeKey(key string) bool {
return timeKeys[key]
}

Expand All @@ -843,6 +861,33 @@ func timeProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) er
})
}

func isCloseStatusKey(key string) bool {
return key == es.CloseStatus
}

func closeStatusProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error {
return processAllValuesForKey(value, func(key string) bool {
return rangeKeys[key]
}, func(obj *fastjson.Object, key string, v *fastjson.Value) error {
statusStr := string(v.GetStringBytes())

// first check if already in int64 format
if _, err := strconv.ParseInt(statusStr, 10, 64); err == nil {
return nil
}

// try to parse close status string
var parsedStatus types.WorkflowExecutionCloseStatus
err := parsedStatus.UnmarshalText([]byte(statusStr))
if err != nil {
return err
}

obj.Set(key, fastjson.MustParse(fmt.Sprintf(`"%d"`, parsedStatus)))
return nil
})
}

// elasticsql may transfer `Attr.Name` to "`Attr.Name`" instead of "Attr.Name" in dsl in some operator like "between and"
// this function is used to clean up
func cleanDSL(input string) string {
Expand Down
56 changes: 50 additions & 6 deletions common/persistence/elasticsearch/esVisibilityStore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,11 @@ func (s *ESVisibilitySuite) TestGetESQueryDSL() {
s.Nil(err)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}},{"bool":{"should":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"bool":{"must_not":{"exists":{"field":"CloseTime"}}}}]}}]}},"from":0,"size":10,"sort":[{"StartTime":"desc"},{"RunID":"desc"}]}`, dsl)

request.Query = `WorkflowID = 'wid' and CloseStatus = "completed"`
dsl, err = v.getESQueryDSL(request, token)
s.Nil(err)
s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}},{"bool":{"must":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"match_phrase":{"CloseStatus":{"query":"0"}}}]}}]}},"from":0,"size":10,"sort":[{"StartTime":"desc"},{"RunID":"desc"}]}`, dsl)

request.Query = `CloseTime = missing order by CloseTime desc`
dsl, err = v.getESQueryDSL(request, token)
s.Nil(err)
Expand Down Expand Up @@ -836,7 +841,7 @@ func (s *ESVisibilitySuite) TestTimeProcessFunc() {
returnErr bool
}{
{value: `"1528358645000000000"`, returnErr: false},
{value: `"1528358645000000000"`},
{value: `"1528358645000000000"`, returnErr: false},
{value: "", returnErr: true},
{value: `"should not be modified"`, returnErr: false},
}
Expand All @@ -852,6 +857,45 @@ func (s *ESVisibilitySuite) TestTimeProcessFunc() {
}
}

func (s *ESVisibilitySuite) TestCloseStatusProcessFunc() {
cases := []struct {
key string
value string
}{
{key: "from", value: types.WorkflowExecutionCloseStatusTerminated.String()},
{key: "to", value: strings.ToLower(types.WorkflowExecutionCloseStatusContinuedAsNew.String())},
{key: "gt", value: fmt.Sprintf(`%d`, types.WorkflowExecutionCloseStatusFailed)},
{key: "query", value: types.WorkflowExecutionCloseStatusCompleted.String()},
{key: "query", value: fmt.Sprintf(`%d`, types.WorkflowExecutionCloseStatusCanceled)},
{key: "query", value: strings.ToTitle(strings.ToLower(types.WorkflowExecutionCloseStatusTimedOut.String()))},
{key: "query", value: "timeout"},
{key: "unrelatedKey", value: "should not be modified"},
}
expected := []struct {
value string
returnErr bool
}{
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusTerminated), returnErr: false},
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusContinuedAsNew), returnErr: false},
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusFailed), returnErr: false},
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusCompleted), returnErr: false},
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusCanceled), returnErr: false},
{value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusTimedOut), returnErr: false},
{value: "", returnErr: true},
{value: `"should not be modified"`, returnErr: false},
}

for i, testCase := range cases {
value := fastjson.MustParse(fmt.Sprintf(`{"%s": "%s"}`, testCase.key, testCase.value))
err := closeStatusProcessFunc(nil, "", value)
if expected[i].returnErr {
s.Error(err)
continue
}
s.Equal(expected[i].value, value.Get(testCase.key).String())
}
}

func (s *ESVisibilitySuite) TestProcessAllValuesForKey() {
testJSONStr := `{
"arrayKey": [
Expand Down Expand Up @@ -881,11 +925,11 @@ func (s *ESVisibilitySuite) TestProcessAllValuesForKey() {
s.NoError(processAllValuesForKey(dsl, testKeyFilter, testProcessFunc))

expectedProcessedValue := map[string]struct{}{
`"value1"`: struct{}{},
`"value2"`: struct{}{},
`"value5"`: struct{}{},
`[{"testKey7":"should not be processed"}]`: struct{}{},
`"value8"`: struct{}{},
`"value1"`: {},
`"value2"`: {},
`"value5"`: {},
`[{"testKey7":"should not be processed"}]`: {},
`"value8"`: {},
}
s.Equal(expectedProcessedValue, processedValue)
}
Expand Down
11 changes: 9 additions & 2 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,20 @@ const (
testGetBoolPropertyFilteredByDomainIDKey
testGetBoolPropertyFilteredByTaskListInfoKey

// used by internal repos, need to moved out of this repo
// TODO https://github.com/uber/cadence/issues/3861
// TODO: following configs are used by internal repo, need to moved out of this repo
// https://github.com/uber/cadence/issues/3861

// EnableAuthorization is the key to enable authorization for a domain
EnableAuthorization
// VisibilityArchivalQueryMaxRangeInDays is the maximum number of days for a visibility archival query
VisibilityArchivalQueryMaxRangeInDays
// VisibilityArchivalQueryMaxQPS is the timeout for a visibility archival query
VisibilityArchivalQueryMaxQPS
// EnableArchivalCompression indicates whether blobs are compressed before they are archived
EnableArchivalCompression
// WorkerDeterministicConstructionCheckProbability controls the probability of running a deterministic construction check for any given archival
WorkerDeterministicConstructionCheckProbability
// WorkerBlobIntegrityCheckProbability controls the probability of running an integrity check for any given archival
WorkerBlobIntegrityCheckProbability

// EnableGlobalDomain is key for enable global domain
Expand Down
2 changes: 1 addition & 1 deletion common/types/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func EventTypeValues() []EventType {
}
}

// DecisionType_Values returns all recognized values of DecisionType.
// DecisionTypeValues returns all recognized values of DecisionType.
func DecisionTypeValues() []DecisionType {
return []DecisionType{
DecisionTypeScheduleActivityTask,
Expand Down
32 changes: 17 additions & 15 deletions tools/cli/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,23 +113,25 @@ var (
optionErr = "there is something wrong with your command options"
osExit = os.Exit
workflowClosedStatusMap = map[string]s.WorkflowExecutionCloseStatus{
"completed": s.WorkflowExecutionCloseStatusCompleted,
"failed": s.WorkflowExecutionCloseStatusFailed,
"canceled": s.WorkflowExecutionCloseStatusCanceled,
"terminated": s.WorkflowExecutionCloseStatusTerminated,
"completed": s.WorkflowExecutionCloseStatusCompleted,
"failed": s.WorkflowExecutionCloseStatusFailed,
"canceled": s.WorkflowExecutionCloseStatusCanceled,
"terminated": s.WorkflowExecutionCloseStatusTerminated,
"continued_as_new": s.WorkflowExecutionCloseStatusContinuedAsNew,
"timed_out": s.WorkflowExecutionCloseStatusTimedOut,
// below are some alias
"c": s.WorkflowExecutionCloseStatusCompleted,
"complete": s.WorkflowExecutionCloseStatusCompleted,
"f": s.WorkflowExecutionCloseStatusFailed,
"fail": s.WorkflowExecutionCloseStatusFailed,
"cancel": s.WorkflowExecutionCloseStatusCanceled,
"terminate": s.WorkflowExecutionCloseStatusTerminated,
"term": s.WorkflowExecutionCloseStatusTerminated,
"continue": s.WorkflowExecutionCloseStatusContinuedAsNew,
"cont": s.WorkflowExecutionCloseStatusContinuedAsNew,
"continuedasnew": s.WorkflowExecutionCloseStatusContinuedAsNew,
"continueasnew": s.WorkflowExecutionCloseStatusContinuedAsNew,
"timedout": s.WorkflowExecutionCloseStatusTimedOut,
// below are some alias
"c": s.WorkflowExecutionCloseStatusCompleted,
"complete": s.WorkflowExecutionCloseStatusCompleted,
"f": s.WorkflowExecutionCloseStatusFailed,
"fail": s.WorkflowExecutionCloseStatusFailed,
"cancel": s.WorkflowExecutionCloseStatusCanceled,
"terminate": s.WorkflowExecutionCloseStatusTerminated,
"term": s.WorkflowExecutionCloseStatusTerminated,
"continue": s.WorkflowExecutionCloseStatusContinuedAsNew,
"cont": s.WorkflowExecutionCloseStatusContinuedAsNew,
"timeout": s.WorkflowExecutionCloseStatusTimedOut,
"timeout": s.WorkflowExecutionCloseStatusTimedOut,
}
)
2 changes: 1 addition & 1 deletion tools/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ func getFlagsForListAll() []cli.Flag {
},
cli.StringFlag{
Name: FlagWorkflowStatusWithAlias,
Usage: "Closed workflow status [completed, failed, canceled, terminated, continuedasnew, timedout]",
Usage: "Closed workflow status [completed, failed, canceled, terminated, continued_as_new, timed_out]",
},
cli.StringFlag{
Name: FlagListQueryWithAlias,
Expand Down
2 changes: 1 addition & 1 deletion tools/cli/workflowCommands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1450,7 +1450,7 @@ func getWorkflowStatus(statusStr string) s.WorkflowExecutionCloseStatus {
return status
}
ErrorAndExit(optionErr, errors.New("option status is not one of allowed values "+
"[completed, failed, canceled, terminated, continueasnew, timedout]"))
"[completed, failed, canceled, terminated, continued_as_new, timed_out]"))
return 0
}

Expand Down

0 comments on commit 6e69fa1

Please sign in to comment.