Skip to content

Commit

Permalink
Added feature to index context header in visibility (cadence-workflow…
Browse files Browse the repository at this point in the history
…#6066)

What changed?

Context headers are now searchable using visibility.

API
As users, they can search workflows by headers through List*Workflows / CountWorkflows with [SQL-like queries]

Header.uberctx-tenancy = "uber/testing" AND CloseStatus != "completed" ORDER BY StartTime DESC
Onboarding Steps
Add "Header.XXX" to frontend.validSearchAttributes
Enable "history.enableContextHeaderInVisibility"
Why?

Context headers are user propagated headers, including tracing baggages. Enable search-ability will enhance debugging and better observability.

How did you test it?

Unit Test
  • Loading branch information
shijiesheng authored and timl3136 committed Jun 6, 2024
1 parent 303c5bf commit 6a2b4a2
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 9 deletions.
9 changes: 5 additions & 4 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ const (
CadenceChangeVersion = "CadenceChangeVersion"
)

// valid non-indexed fields on ES
const (
// Memo is valid non-indexed fields on ES
Memo = "Memo"
// Attr is prefix of custom search attributes
Attr = "Attr"
// HeaderFormat is the format of context headers in search attributes
HeaderFormat = "Header.%s"
)

// Attr is prefix of custom search attributes
const Attr = "Attr"

// defaultIndexedKeys defines all searchable keys
var defaultIndexedKeys = createDefaultIndexedKeys()

Expand Down
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,12 @@ const (
// Default value: false
// Allowed filters: DomainName
EnableConsistentQueryByDomain
// EnableContextHeaderInVisibility is key for enable context header in visibility
// KeyName: history.enableContextHeaderInVisibility
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableContextHeaderInVisibility
// EnableCrossClusterEngine is used as an overall switch for the cross-cluster feature, a feature which, if not enabled
// can be quite expensive in terms of resources
// KeyName: history.enableCrossClusterEngine
Expand Down Expand Up @@ -4061,6 +4067,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain",
DefaultValue: false,
},
EnableContextHeaderInVisibility: {
KeyName: "history.enableContextHeaderInVisibility",
Filters: []Filter{DomainName},
Description: "EnableContextHeaderInVisibility is key for enable context header in visibility",
DefaultValue: false,
},
EnableCrossClusterEngine: {
KeyName: "history.enableCrossClusterEngine",
Description: "an overall toggle for the cross-cluster domain feature",
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ type Config struct {
EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
MaxBufferedQueryCount dynamicconfig.IntPropertyFn

// EnableContextHeaderInVisibility whether to enable indexing context header in visibility
EnableContextHeaderInVisibility dynamicconfig.BoolPropertyFnWithDomainFilter

EnableCrossClusterEngine dynamicconfig.BoolPropertyFn
EnableCrossClusterOperationsForDomain dynamicconfig.BoolPropertyFnWithDomainFilter

Expand Down Expand Up @@ -570,6 +573,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
EnableContextHeaderInVisibility: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableContextHeaderInVisibility),
EnableCrossClusterEngine: dc.GetBoolProperty(dynamicconfig.EnableCrossClusterEngine),
EnableCrossClusterOperationsForDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperationsForDomain),
MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount),
Expand Down
5 changes: 5 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,11 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()
Expand Down
110 changes: 108 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,57 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask()
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1727,7 +1777,47 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1834,6 +1924,7 @@ func createRecordWorkflowExecutionStartedRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.RecordWorkflowExecutionStartedRequest {
taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
workflowExecution := types.WorkflowExecution{
Expand All @@ -1846,6 +1937,12 @@ func createRecordWorkflowExecutionStartedRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}
return &persistence.RecordWorkflowExecutionStartedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Expand All @@ -1859,6 +1956,7 @@ func createRecordWorkflowExecutionStartedRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down Expand Up @@ -1975,6 +2073,7 @@ func createUpsertWorkflowSearchAttributesRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.UpsertWorkflowExecutionRequest {

taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
Expand All @@ -1988,6 +2087,12 @@ func createUpsertWorkflowSearchAttributesRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}

return &persistence.UpsertWorkflowExecutionRequest{
Domain: domainName,
Expand All @@ -2002,6 +2107,7 @@ func createUpsertWorkflowSearchAttributesRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down
8 changes: 7 additions & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
startTimestamp := startEvent.GetTimestamp()
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := t.shard.GetTimeSource().Now()

Expand All @@ -493,6 +492,13 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
}
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))

searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}

if isRecordStart {
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
return t.recordWorkflowStarted(
Expand Down
105 changes: 103 additions & 2 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
dc "github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/ndc"
Expand Down Expand Up @@ -800,7 +801,62 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, err := test.StartWorkflow(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, startEvent.ID, startEvent.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down Expand Up @@ -834,7 +890,52 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down
Loading

0 comments on commit 6a2b4a2

Please sign in to comment.