From 6e69fa1a6e9ae5d2f683759820f09d1286ba7797 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 4 Jan 2021 17:05:21 -0800 Subject: [PATCH] Support visibility query with close status represented in string (#3865) --- .../elasticsearch/esVisibilityStore.go | 57 +++++++++++++++++-- .../elasticsearch/esVisibilityStore_test.go | 56 ++++++++++++++++-- common/service/dynamicconfig/constants.go | 11 +++- common/types/enums.go | 2 +- tools/cli/defs.go | 32 ++++++----- tools/cli/flags.go | 2 +- tools/cli/workflowCommands.go | 2 +- 7 files changed, 130 insertions(+), 32 deletions(-) diff --git a/common/persistence/elasticsearch/esVisibilityStore.go b/common/persistence/elasticsearch/esVisibilityStore.go index 0c4c306000e..5110dd85f02 100644 --- a/common/persistence/elasticsearch/esVisibilityStore.go +++ b/common/persistence/elasticsearch/esVisibilityStore.go @@ -468,9 +468,9 @@ const ( var ( timeKeys = map[string]bool{ - "StartTime": true, - "CloseTime": true, - "ExecutionTime": true, + es.StartTime: true, + es.CloseTime: true, + es.ExecutionTime: true, } rangeKeys = map[string]bool{ "from": true, @@ -575,7 +575,7 @@ func getCustomizedDSLFromSQL(sql string, domainID string) (*fastjson.Value, erro addQueryForExecutionTime(dsl) } addDomainToQuery(dsl, domainID) - if err := processAllValuesForKey(dsl, timeKeyFilter, timeProcessFunc); err != nil { + if err := processAllValuesForKey(dsl, isCombinedKey, combinedProcessFunc); err != nil { return nil, err } return dsl, nil @@ -782,7 +782,9 @@ func checkPageSize(request *p.ListWorkflowExecutionsByQueryRequest) { } } -func processAllValuesForKey(dsl *fastjson.Value, keyFilter func(k string) bool, +func processAllValuesForKey( + dsl *fastjson.Value, + keyFilter func(k string) bool, processFunc func(obj *fastjson.Object, key string, v *fastjson.Value) error, ) error { switch dsl.Type() { @@ -817,7 +819,23 @@ func processAllValuesForKey(dsl *fastjson.Value, keyFilter func(k string) bool, return nil } -func timeKeyFilter(key string) bool { +func isCombinedKey(key string) bool { + return isTimeKey(key) || isCloseStatusKey(key) +} + +func combinedProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error { + if isTimeKey(key) { + return timeProcessFunc(obj, key, value) + } + + if isCloseStatusKey(key) { + return closeStatusProcessFunc(obj, key, value) + } + + return fmt.Errorf("unknown es dsl key %v for processing value", key) +} + +func isTimeKey(key string) bool { return timeKeys[key] } @@ -843,6 +861,33 @@ func timeProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) er }) } +func isCloseStatusKey(key string) bool { + return key == es.CloseStatus +} + +func closeStatusProcessFunc(obj *fastjson.Object, key string, value *fastjson.Value) error { + return processAllValuesForKey(value, func(key string) bool { + return rangeKeys[key] + }, func(obj *fastjson.Object, key string, v *fastjson.Value) error { + statusStr := string(v.GetStringBytes()) + + // first check if already in int64 format + if _, err := strconv.ParseInt(statusStr, 10, 64); err == nil { + return nil + } + + // try to parse close status string + var parsedStatus types.WorkflowExecutionCloseStatus + err := parsedStatus.UnmarshalText([]byte(statusStr)) + if err != nil { + return err + } + + obj.Set(key, fastjson.MustParse(fmt.Sprintf(`"%d"`, parsedStatus))) + return nil + }) +} + // elasticsql may transfer `Attr.Name` to "`Attr.Name`" instead of "Attr.Name" in dsl in some operator like "between and" // this function is used to clean up func cleanDSL(input string) string { diff --git a/common/persistence/elasticsearch/esVisibilityStore_test.go b/common/persistence/elasticsearch/esVisibilityStore_test.go index c6642fdd68d..4f721260e4c 100644 --- a/common/persistence/elasticsearch/esVisibilityStore_test.go +++ b/common/persistence/elasticsearch/esVisibilityStore_test.go @@ -588,6 +588,11 @@ func (s *ESVisibilitySuite) TestGetESQueryDSL() { s.Nil(err) s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}},{"bool":{"should":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"bool":{"must_not":{"exists":{"field":"CloseTime"}}}}]}}]}},"from":0,"size":10,"sort":[{"StartTime":"desc"},{"RunID":"desc"}]}`, dsl) + request.Query = `WorkflowID = 'wid' and CloseStatus = "completed"` + dsl, err = v.getESQueryDSL(request, token) + s.Nil(err) + s.Equal(`{"query":{"bool":{"must":[{"match_phrase":{"DomainID":{"query":"bfd5c907-f899-4baf-a7b2-2ab85e623ebd"}}},{"bool":{"must":[{"match_phrase":{"WorkflowID":{"query":"wid"}}},{"match_phrase":{"CloseStatus":{"query":"0"}}}]}}]}},"from":0,"size":10,"sort":[{"StartTime":"desc"},{"RunID":"desc"}]}`, dsl) + request.Query = `CloseTime = missing order by CloseTime desc` dsl, err = v.getESQueryDSL(request, token) s.Nil(err) @@ -836,7 +841,7 @@ func (s *ESVisibilitySuite) TestTimeProcessFunc() { returnErr bool }{ {value: `"1528358645000000000"`, returnErr: false}, - {value: `"1528358645000000000"`}, + {value: `"1528358645000000000"`, returnErr: false}, {value: "", returnErr: true}, {value: `"should not be modified"`, returnErr: false}, } @@ -852,6 +857,45 @@ func (s *ESVisibilitySuite) TestTimeProcessFunc() { } } +func (s *ESVisibilitySuite) TestCloseStatusProcessFunc() { + cases := []struct { + key string + value string + }{ + {key: "from", value: types.WorkflowExecutionCloseStatusTerminated.String()}, + {key: "to", value: strings.ToLower(types.WorkflowExecutionCloseStatusContinuedAsNew.String())}, + {key: "gt", value: fmt.Sprintf(`%d`, types.WorkflowExecutionCloseStatusFailed)}, + {key: "query", value: types.WorkflowExecutionCloseStatusCompleted.String()}, + {key: "query", value: fmt.Sprintf(`%d`, types.WorkflowExecutionCloseStatusCanceled)}, + {key: "query", value: strings.ToTitle(strings.ToLower(types.WorkflowExecutionCloseStatusTimedOut.String()))}, + {key: "query", value: "timeout"}, + {key: "unrelatedKey", value: "should not be modified"}, + } + expected := []struct { + value string + returnErr bool + }{ + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusTerminated), returnErr: false}, + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusContinuedAsNew), returnErr: false}, + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusFailed), returnErr: false}, + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusCompleted), returnErr: false}, + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusCanceled), returnErr: false}, + {value: fmt.Sprintf(`"%d"`, types.WorkflowExecutionCloseStatusTimedOut), returnErr: false}, + {value: "", returnErr: true}, + {value: `"should not be modified"`, returnErr: false}, + } + + for i, testCase := range cases { + value := fastjson.MustParse(fmt.Sprintf(`{"%s": "%s"}`, testCase.key, testCase.value)) + err := closeStatusProcessFunc(nil, "", value) + if expected[i].returnErr { + s.Error(err) + continue + } + s.Equal(expected[i].value, value.Get(testCase.key).String()) + } +} + func (s *ESVisibilitySuite) TestProcessAllValuesForKey() { testJSONStr := `{ "arrayKey": [ @@ -881,11 +925,11 @@ func (s *ESVisibilitySuite) TestProcessAllValuesForKey() { s.NoError(processAllValuesForKey(dsl, testKeyFilter, testProcessFunc)) expectedProcessedValue := map[string]struct{}{ - `"value1"`: struct{}{}, - `"value2"`: struct{}{}, - `"value5"`: struct{}{}, - `[{"testKey7":"should not be processed"}]`: struct{}{}, - `"value8"`: struct{}{}, + `"value1"`: {}, + `"value2"`: {}, + `"value5"`: {}, + `[{"testKey7":"should not be processed"}]`: {}, + `"value8"`: {}, } s.Equal(expectedProcessedValue, processedValue) } diff --git a/common/service/dynamicconfig/constants.go b/common/service/dynamicconfig/constants.go index 21187bedad0..a19d8e8e0fd 100644 --- a/common/service/dynamicconfig/constants.go +++ b/common/service/dynamicconfig/constants.go @@ -349,13 +349,20 @@ const ( testGetBoolPropertyFilteredByDomainIDKey testGetBoolPropertyFilteredByTaskListInfoKey - // used by internal repos, need to moved out of this repo - // TODO https://github.com/uber/cadence/issues/3861 + // TODO: following configs are used by internal repo, need to moved out of this repo + // https://github.com/uber/cadence/issues/3861 + + // EnableAuthorization is the key to enable authorization for a domain EnableAuthorization + // VisibilityArchivalQueryMaxRangeInDays is the maximum number of days for a visibility archival query VisibilityArchivalQueryMaxRangeInDays + // VisibilityArchivalQueryMaxQPS is the timeout for a visibility archival query VisibilityArchivalQueryMaxQPS + // EnableArchivalCompression indicates whether blobs are compressed before they are archived EnableArchivalCompression + // WorkerDeterministicConstructionCheckProbability controls the probability of running a deterministic construction check for any given archival WorkerDeterministicConstructionCheckProbability + // WorkerBlobIntegrityCheckProbability controls the probability of running an integrity check for any given archival WorkerBlobIntegrityCheckProbability // EnableGlobalDomain is key for enable global domain diff --git a/common/types/enums.go b/common/types/enums.go index e3561938cb0..bb58f80400a 100644 --- a/common/types/enums.go +++ b/common/types/enums.go @@ -68,7 +68,7 @@ func EventTypeValues() []EventType { } } -// DecisionType_Values returns all recognized values of DecisionType. +// DecisionTypeValues returns all recognized values of DecisionType. func DecisionTypeValues() []DecisionType { return []DecisionType{ DecisionTypeScheduleActivityTask, diff --git a/tools/cli/defs.go b/tools/cli/defs.go index e8bd1c340d4..16b9e7b8f23 100644 --- a/tools/cli/defs.go +++ b/tools/cli/defs.go @@ -113,23 +113,25 @@ var ( optionErr = "there is something wrong with your command options" osExit = os.Exit workflowClosedStatusMap = map[string]s.WorkflowExecutionCloseStatus{ - "completed": s.WorkflowExecutionCloseStatusCompleted, - "failed": s.WorkflowExecutionCloseStatusFailed, - "canceled": s.WorkflowExecutionCloseStatusCanceled, - "terminated": s.WorkflowExecutionCloseStatusTerminated, + "completed": s.WorkflowExecutionCloseStatusCompleted, + "failed": s.WorkflowExecutionCloseStatusFailed, + "canceled": s.WorkflowExecutionCloseStatusCanceled, + "terminated": s.WorkflowExecutionCloseStatusTerminated, + "continued_as_new": s.WorkflowExecutionCloseStatusContinuedAsNew, + "timed_out": s.WorkflowExecutionCloseStatusTimedOut, + // below are some alias + "c": s.WorkflowExecutionCloseStatusCompleted, + "complete": s.WorkflowExecutionCloseStatusCompleted, + "f": s.WorkflowExecutionCloseStatusFailed, + "fail": s.WorkflowExecutionCloseStatusFailed, + "cancel": s.WorkflowExecutionCloseStatusCanceled, + "terminate": s.WorkflowExecutionCloseStatusTerminated, + "term": s.WorkflowExecutionCloseStatusTerminated, + "continue": s.WorkflowExecutionCloseStatusContinuedAsNew, + "cont": s.WorkflowExecutionCloseStatusContinuedAsNew, "continuedasnew": s.WorkflowExecutionCloseStatusContinuedAsNew, "continueasnew": s.WorkflowExecutionCloseStatusContinuedAsNew, "timedout": s.WorkflowExecutionCloseStatusTimedOut, - // below are some alias - "c": s.WorkflowExecutionCloseStatusCompleted, - "complete": s.WorkflowExecutionCloseStatusCompleted, - "f": s.WorkflowExecutionCloseStatusFailed, - "fail": s.WorkflowExecutionCloseStatusFailed, - "cancel": s.WorkflowExecutionCloseStatusCanceled, - "terminate": s.WorkflowExecutionCloseStatusTerminated, - "term": s.WorkflowExecutionCloseStatusTerminated, - "continue": s.WorkflowExecutionCloseStatusContinuedAsNew, - "cont": s.WorkflowExecutionCloseStatusContinuedAsNew, - "timeout": s.WorkflowExecutionCloseStatusTimedOut, + "timeout": s.WorkflowExecutionCloseStatusTimedOut, } ) diff --git a/tools/cli/flags.go b/tools/cli/flags.go index 0e3cb3e76cc..d6aadcfde6e 100644 --- a/tools/cli/flags.go +++ b/tools/cli/flags.go @@ -566,7 +566,7 @@ func getFlagsForListAll() []cli.Flag { }, cli.StringFlag{ Name: FlagWorkflowStatusWithAlias, - Usage: "Closed workflow status [completed, failed, canceled, terminated, continuedasnew, timedout]", + Usage: "Closed workflow status [completed, failed, canceled, terminated, continued_as_new, timed_out]", }, cli.StringFlag{ Name: FlagListQueryWithAlias, diff --git a/tools/cli/workflowCommands.go b/tools/cli/workflowCommands.go index 1ccb07b8cbb..ea9ff4ff34e 100644 --- a/tools/cli/workflowCommands.go +++ b/tools/cli/workflowCommands.go @@ -1450,7 +1450,7 @@ func getWorkflowStatus(statusStr string) s.WorkflowExecutionCloseStatus { return status } ErrorAndExit(optionErr, errors.New("option status is not one of allowed values "+ - "[completed, failed, canceled, terminated, continueasnew, timedout]")) + "[completed, failed, canceled, terminated, continued_as_new, timed_out]")) return 0 }