diff --git a/common/definition/indexedKeys.go b/common/definition/indexedKeys.go index de07b5588b9..50a6c3fd309 100644 --- a/common/definition/indexedKeys.go +++ b/common/definition/indexedKeys.go @@ -52,14 +52,15 @@ const ( CadenceChangeVersion = "CadenceChangeVersion" ) -// valid non-indexed fields on ES const ( + // Memo is valid non-indexed fields on ES Memo = "Memo" + // Attr is prefix of custom search attributes + Attr = "Attr" + // HeaderFormat is the format of context headers in search attributes + HeaderFormat = "Header.%s" ) -// Attr is prefix of custom search attributes -const Attr = "Attr" - // defaultIndexedKeys defines all searchable keys var defaultIndexedKeys = createDefaultIndexedKeys() diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 59ad94963a7..366f2fa0d4b 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1695,6 +1695,12 @@ const ( // Default value: false // Allowed filters: DomainName EnableConsistentQueryByDomain + // EnableContextHeaderInVisibility is key for enable context header in visibility + // KeyName: history.enableContextHeaderInVisibility + // Value type: Bool + // Default value: false + // Allowed filters: DomainName + EnableContextHeaderInVisibility // EnableCrossClusterEngine is used as an overall switch for the cross-cluster feature, a feature which, if not enabled // can be quite expensive in terms of resources // KeyName: history.enableCrossClusterEngine @@ -4061,6 +4067,12 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain", DefaultValue: false, }, + EnableContextHeaderInVisibility: { + KeyName: "history.enableContextHeaderInVisibility", + Filters: []Filter{DomainName}, + Description: "EnableContextHeaderInVisibility is key for enable context header in visibility", + DefaultValue: false, + }, EnableCrossClusterEngine: { KeyName: "history.enableCrossClusterEngine", Description: "an overall toggle for the cross-cluster domain feature", diff --git a/service/history/config/config.go b/service/history/config/config.go index c44202b1466..8932e0c4f4d 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -302,6 +302,9 @@ type Config struct { EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter MaxBufferedQueryCount dynamicconfig.IntPropertyFn + // EnableContextHeaderInVisibility whether to enable indexing context header in visibility + EnableContextHeaderInVisibility dynamicconfig.BoolPropertyFnWithDomainFilter + EnableCrossClusterEngine dynamicconfig.BoolPropertyFn EnableCrossClusterOperationsForDomain dynamicconfig.BoolPropertyFnWithDomainFilter @@ -570,6 +573,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery), EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain), + EnableContextHeaderInVisibility: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableContextHeaderInVisibility), EnableCrossClusterEngine: dc.GetBoolProperty(dynamicconfig.EnableCrossClusterEngine), EnableCrossClusterOperationsForDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperationsForDomain), MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount), diff --git a/service/history/task/transfer_active_task_executor.go b/service/history/task/transfer_active_task_executor.go index 77b2ff9a98b..17774e43bb4 100644 --- a/service/history/task/transfer_active_task_executor.go +++ b/service/history/task/transfer_active_task_executor.go @@ -1045,6 +1045,11 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper( executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) searchAttr := copySearchAttributes(executionInfo.SearchAttributes) + if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { + if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { + searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()) + } + } isCron := len(executionInfo.CronSchedule) > 0 numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) updateTimestamp := t.shard.GetTimeSource().Now() diff --git a/service/history/task/transfer_active_task_executor_test.go b/service/history/task/transfer_active_task_executor_test.go index 3d8e54a3fb6..39ba77dae48 100644 --- a/service/history/task/transfer_active_task_executor_test.go +++ b/service/history/task/transfer_active_task_executor_test.go @@ -1696,7 +1696,57 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask() "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( - s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()), + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + false), + ).Once().Return(nil) + + err = s.transferActiveTaskExecutor.Execute(transferTask, true) + s.Nil(err) +} + +func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() { + // switch on context header in viz + s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true } + s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { + return map[string]interface{}{ + "Header.contextKey": struct{}{}, + } + } + + workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID) + s.NoError(err) + executionInfo := mutableState.GetExecutionInfo() + executionInfo.CronSchedule = "@every 5s" + startEvent, err := mutableState.GetStartEvent(context.Background()) + s.NoError(err) + startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5) + + transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + TaskID: int64(59), + TaskList: mutableState.GetExecutionInfo().TaskList, + TaskType: persistence.TransferTaskTypeRecordWorkflowStarted, + }) + + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion()) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) { + s.mockVisibilityMgr.On( + "RecordWorkflowExecutionUninitialized", + mock.Anything, + createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234), + ).Once().Return(nil) + } + s.mockVisibilityMgr.On( + "RecordWorkflowExecutionStarted", + mock.Anything, + createRecordWorkflowExecutionStartedRequest( + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + true), ).Once().Return(nil) err = s.transferActiveTaskExecutor.Execute(transferTask, true) @@ -1727,7 +1777,47 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( - s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()), + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + false), + ).Once().Return(nil) + + err = s.transferActiveTaskExecutor.Execute(transferTask, true) + s.Nil(err) +} + +func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesWithContextHeader() { + // switch on context header in viz + s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true } + s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { + return map[string]interface{}{ + "Header.contextKey": struct{}{}, + } + } + + workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID) + s.NoError(err) + + transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + TaskID: int64(59), + TaskList: mutableState.GetExecutionInfo().TaskList, + TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes, + }) + + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion()) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + startEvent, err := mutableState.GetStartEvent(context.Background()) + s.NoError(err) + s.mockVisibilityMgr.On( + "UpsertWorkflowExecution", + mock.Anything, + createUpsertWorkflowSearchAttributesRequest( + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + true), ).Once().Return(nil) err = s.transferActiveTaskExecutor.Execute(transferTask, true) @@ -1834,6 +1924,7 @@ func createRecordWorkflowExecutionStartedRequest( mutableState execution.MutableState, numClusters int16, updateTime time.Time, + enableContextHeaderInVisibility bool, ) *persistence.RecordWorkflowExecutionStartedRequest { taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo) workflowExecution := types.WorkflowExecution{ @@ -1846,6 +1937,12 @@ func createRecordWorkflowExecutionStartedRequest( if backoffSeconds != 0 { executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second) } + var searchAttributes map[string][]byte + if enableContextHeaderInVisibility { + searchAttributes = map[string][]byte{ + "Header.contextKey": []byte("contextValue"), + } + } return &persistence.RecordWorkflowExecutionStartedRequest{ Domain: domainName, DomainUUID: taskInfo.DomainID, @@ -1859,6 +1956,7 @@ func createRecordWorkflowExecutionStartedRequest( IsCron: len(executionInfo.CronSchedule) > 0, NumClusters: numClusters, UpdateTimestamp: updateTime.UnixNano(), + SearchAttributes: searchAttributes, } } @@ -1975,6 +2073,7 @@ func createUpsertWorkflowSearchAttributesRequest( mutableState execution.MutableState, numClusters int16, updateTime time.Time, + enableContextHeaderInVisibility bool, ) *persistence.UpsertWorkflowExecutionRequest { taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo) @@ -1988,6 +2087,12 @@ func createUpsertWorkflowSearchAttributesRequest( if backoffSeconds != 0 { executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second) } + var searchAttributes map[string][]byte + if enableContextHeaderInVisibility { + searchAttributes = map[string][]byte{ + "Header.contextKey": []byte("contextValue"), + } + } return &persistence.UpsertWorkflowExecutionRequest{ Domain: domainName, @@ -2002,6 +2107,7 @@ func createUpsertWorkflowSearchAttributesRequest( IsCron: len(executionInfo.CronSchedule) > 0, NumClusters: numClusters, UpdateTimestamp: updateTime.UnixNano(), + SearchAttributes: searchAttributes, } } diff --git a/service/history/task/transfer_standby_task_executor.go b/service/history/task/transfer_standby_task_executor.go index 8b1ff037d11..11488f375fd 100644 --- a/service/history/task/transfer_standby_task_executor.go +++ b/service/history/task/transfer_standby_task_executor.go @@ -483,7 +483,6 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper startTimestamp := startEvent.GetTimestamp() executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent) visibilityMemo := getWorkflowMemo(executionInfo.Memo) - searchAttr := copySearchAttributes(executionInfo.SearchAttributes) isCron := len(executionInfo.CronSchedule) > 0 updateTimestamp := t.shard.GetTimeSource().Now() @@ -493,6 +492,13 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper } numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters)) + searchAttr := copySearchAttributes(executionInfo.SearchAttributes) + if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) { + if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil { + searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes()) + } + } + if isRecordStart { workflowStartedScope.IncCounter(metrics.WorkflowStartedCount) return t.recordWorkflowStarted( diff --git a/service/history/task/transfer_standby_task_executor_test.go b/service/history/task/transfer_standby_task_executor_test.go index 8614e7d9b29..6a5fe067ae7 100644 --- a/service/history/task/transfer_standby_task_executor_test.go +++ b/service/history/task/transfer_standby_task_executor_test.go @@ -38,6 +38,7 @@ import ( "github.com/uber/cadence/common/cache" "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/cluster" + dc "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/mocks" "github.com/uber/cadence/common/ndc" @@ -800,7 +801,62 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask( "RecordWorkflowExecutionStarted", mock.Anything, createRecordWorkflowExecutionStartedRequest( - s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()), + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + false), + ).Return(nil).Once() + + s.mockShard.SetCurrentTime(s.clusterName, now) + err = s.transferStandbyTaskExecutor.Execute(transferTask, true) + s.Nil(err) +} + +func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() { + // switch on context header in viz + s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { + return true + } + s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { + return map[string]interface{}{ + "Header.contextKey": struct{}{}, + } + } + + workflowExecution, mutableState, err := test.StartWorkflow(s.mockShard, s.domainID) + s.NoError(err) + executionInfo := mutableState.GetExecutionInfo() + executionInfo.CronSchedule = "@every 5s" + startEvent, err := mutableState.GetStartEvent(context.Background()) + s.NoError(err) + startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5) + + now := time.Now() + transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + VisibilityTimestamp: now, + TaskID: int64(59), + TaskList: mutableState.GetExecutionInfo().TaskList, + TaskType: persistence.TransferTaskTypeRecordWorkflowStarted, + }) + + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, startEvent.ID, startEvent.Version) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) { + s.mockVisibilityMgr.On( + "RecordWorkflowExecutionUninitialized", + mock.Anything, + createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234), + ).Once().Return(nil) + } + s.mockVisibilityMgr.On( + "RecordWorkflowExecutionStarted", + mock.Anything, + createRecordWorkflowExecutionStartedRequest( + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + true), ).Return(nil).Once() s.mockShard.SetCurrentTime(s.clusterName, now) @@ -834,7 +890,52 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib "UpsertWorkflowExecution", mock.Anything, createUpsertWorkflowSearchAttributesRequest( - s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()), + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + false), + ).Return(nil).Once() + + s.mockShard.SetCurrentTime(s.clusterName, now) + err = s.transferStandbyTaskExecutor.Execute(transferTask, true) + s.Nil(err) +} + +func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesTaskWithContextHeader() { + // switch on context header in viz + s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { + return true + } + s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} { + return map[string]interface{}{ + "Header.contextKey": struct{}{}, + } + } + + workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID) + s.NoError(err) + + now := time.Now() + transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{ + Version: s.version, + DomainID: s.domainID, + WorkflowID: workflowExecution.GetWorkflowID(), + RunID: workflowExecution.GetRunID(), + VisibilityTimestamp: now, + TaskID: int64(59), + TaskList: mutableState.GetExecutionInfo().TaskList, + TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes, + }) + + persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion()) + s.NoError(err) + s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil) + startEvent, err := mutableState.GetStartEvent(context.Background()) + s.NoError(err) + s.mockVisibilityMgr.On( + "UpsertWorkflowExecution", + mock.Anything, + createUpsertWorkflowSearchAttributesRequest( + s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(), + true), ).Return(nil).Once() s.mockShard.SetCurrentTime(s.clusterName, now) diff --git a/service/history/task/transfer_task_executor_base.go b/service/history/task/transfer_task_executor_base.go index 94c8fda52a0..ae3ba9d00f1 100644 --- a/service/history/task/transfer_task_executor_base.go +++ b/service/history/task/transfer_task_executor_base.go @@ -22,11 +22,13 @@ package task import ( "context" + "fmt" "time" "github.com/uber/cadence/client/matching" "github.com/uber/cadence/common" "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/definition" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/metrics" @@ -397,6 +399,25 @@ func getWorkflowMemo( return &types.Memo{Fields: memo} } +func appendContextHeaderToSearchAttributes(attr, context map[string][]byte, allowedKeys map[string]interface{}) map[string][]byte { + for k, v := range context { + key := fmt.Sprintf(definition.HeaderFormat, k) + if _, ok := attr[key]; ok { // skip if key already exists + continue + } + if _, allowed := allowedKeys[key]; !allowed { // skip if not allowed + continue + } + val := make([]byte, len(v)) + copy(val, v) + if attr == nil { + attr = make(map[string][]byte) + } + attr[key] = val + } + return attr +} + func copySearchAttributes( input map[string][]byte, ) map[string][]byte { diff --git a/service/history/testing/workflow_util.go b/service/history/testing/workflow_util.go index e3637c06e43..21210119722 100644 --- a/service/history/testing/workflow_util.go +++ b/service/history/testing/workflow_util.go @@ -64,6 +64,10 @@ func StartWorkflow( TaskList: &types.TaskList{Name: taskListName}, ExecutionStartToCloseTimeoutSeconds: common.Int32Ptr(2), TaskStartToCloseTimeoutSeconds: common.Int32Ptr(1), + Header: &types.Header{Fields: map[string][]byte{ + "contextKey": []byte("contextValue"), + "invalidContextKey": []byte("invalidContextValue"), + }}, }, PartitionConfig: map[string]string{"userid": uuid.New()}, },