Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added feature to index context header in visibility #6066

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ const (
CadenceChangeVersion = "CadenceChangeVersion"
)

// valid non-indexed fields on ES
const (
// Memo is valid non-indexed fields on ES
Memo = "Memo"
// Attr is prefix of custom search attributes
Attr = "Attr"
// HeaderFormat is the format of context headers in search attributes
HeaderFormat = "Header.%s"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add validation somewhere to ensure that users don't create Header.Something search attributes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use admin API AddSearchAttribute from admin CLI to onboard a new search attributes. If we block there, it's going to be manual to update schema on visibility storage directly, which is not ideal.
The validation thus has to be manual

)

// Attr is prefix of custom search attributes
const Attr = "Attr"

// defaultIndexedKeys defines all searchable keys
var defaultIndexedKeys = createDefaultIndexedKeys()

Expand Down
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,12 @@ const (
// Default value: false
// Allowed filters: DomainName
EnableConsistentQueryByDomain
// EnableContextHeaderInVisibility is key for enable context header in visibility
// KeyName: history.enableContextHeaderInVisibility
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableContextHeaderInVisibility
// EnableCrossClusterEngine is used as an overall switch for the cross-cluster feature, a feature which, if not enabled
// can be quite expensive in terms of resources
// KeyName: history.enableCrossClusterEngine
Expand Down Expand Up @@ -4061,6 +4067,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain",
DefaultValue: false,
},
EnableContextHeaderInVisibility: {
KeyName: "history.enableContextHeaderInVisibility",
Filters: []Filter{DomainName},
Description: "EnableContextHeaderInVisibility is key for enable context header in visibility",
DefaultValue: false,
},
EnableCrossClusterEngine: {
KeyName: "history.enableCrossClusterEngine",
Description: "an overall toggle for the cross-cluster domain feature",
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ type Config struct {
EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
MaxBufferedQueryCount dynamicconfig.IntPropertyFn

// EnableContextHeaderInVisibility whether to enable indexing context header in visibility
EnableContextHeaderInVisibility dynamicconfig.BoolPropertyFnWithDomainFilter

EnableCrossClusterEngine dynamicconfig.BoolPropertyFn
EnableCrossClusterOperationsForDomain dynamicconfig.BoolPropertyFnWithDomainFilter

Expand Down Expand Up @@ -570,6 +573,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
EnableContextHeaderInVisibility: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableContextHeaderInVisibility),
EnableCrossClusterEngine: dc.GetBoolProperty(dynamicconfig.EnableCrossClusterEngine),
EnableCrossClusterOperationsForDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperationsForDomain),
MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount),
Expand Down
5 changes: 5 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,11 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()
Expand Down
110 changes: 108 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,57 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask()
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1727,7 +1777,47 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1834,6 +1924,7 @@ func createRecordWorkflowExecutionStartedRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.RecordWorkflowExecutionStartedRequest {
taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
workflowExecution := types.WorkflowExecution{
Expand All @@ -1846,6 +1937,12 @@ func createRecordWorkflowExecutionStartedRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}
return &persistence.RecordWorkflowExecutionStartedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Expand All @@ -1859,6 +1956,7 @@ func createRecordWorkflowExecutionStartedRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down Expand Up @@ -1975,6 +2073,7 @@ func createUpsertWorkflowSearchAttributesRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.UpsertWorkflowExecutionRequest {

taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
Expand All @@ -1988,6 +2087,12 @@ func createUpsertWorkflowSearchAttributesRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}

return &persistence.UpsertWorkflowExecutionRequest{
Domain: domainName,
Expand All @@ -2002,6 +2107,7 @@ func createUpsertWorkflowSearchAttributesRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down
8 changes: 7 additions & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
startTimestamp := startEvent.GetTimestamp()
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := t.shard.GetTimeSource().Now()

Expand All @@ -493,6 +492,13 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
}
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))

searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}

if isRecordStart {
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
return t.recordWorkflowStarted(
Expand Down
105 changes: 103 additions & 2 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
dc "github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/ndc"
Expand Down Expand Up @@ -800,7 +801,62 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, err := test.StartWorkflow(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, startEvent.ID, startEvent.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down Expand Up @@ -834,7 +890,52 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down
Loading
Loading