Skip to content

Commit

Permalink
Added feature to index context header in visibility (#6066)
Browse files Browse the repository at this point in the history
What changed?

Context headers are now searchable using visibility.

API
As users, they can search workflows by headers through List*Workflows / CountWorkflows with [SQL-like queries]

Header.uberctx-tenancy = "uber/testing" AND CloseStatus != "completed" ORDER BY StartTime DESC
Onboarding Steps
Add "Header.XXX" to frontend.validSearchAttributes
Enable "history.enableContextHeaderInVisibility"
Why?

Context headers are user propagated headers, including tracing baggages. Enable search-ability will enhance debugging and better observability.

How did you test it?

Unit Test
  • Loading branch information
shijiesheng authored May 29, 2024
1 parent f8765f0 commit 2f77e62
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 9 deletions.
9 changes: 5 additions & 4 deletions common/definition/indexedKeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,15 @@ const (
CadenceChangeVersion = "CadenceChangeVersion"
)

// valid non-indexed fields on ES
const (
// Memo is valid non-indexed fields on ES
Memo = "Memo"
// Attr is prefix of custom search attributes
Attr = "Attr"
// HeaderFormat is the format of context headers in search attributes
HeaderFormat = "Header.%s"
)

// Attr is prefix of custom search attributes
const Attr = "Attr"

// defaultIndexedKeys defines all searchable keys
var defaultIndexedKeys = createDefaultIndexedKeys()

Expand Down
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1695,6 +1695,12 @@ const (
// Default value: false
// Allowed filters: DomainName
EnableConsistentQueryByDomain
// EnableContextHeaderInVisibility is key for enable context header in visibility
// KeyName: history.enableContextHeaderInVisibility
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableContextHeaderInVisibility
// EnableCrossClusterEngine is used as an overall switch for the cross-cluster feature, a feature which, if not enabled
// can be quite expensive in terms of resources
// KeyName: history.enableCrossClusterEngine
Expand Down Expand Up @@ -4061,6 +4067,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableConsistentQueryByDomain indicates if consistent query is enabled for a domain",
DefaultValue: false,
},
EnableContextHeaderInVisibility: {
KeyName: "history.enableContextHeaderInVisibility",
Filters: []Filter{DomainName},
Description: "EnableContextHeaderInVisibility is key for enable context header in visibility",
DefaultValue: false,
},
EnableCrossClusterEngine: {
KeyName: "history.enableCrossClusterEngine",
Description: "an overall toggle for the cross-cluster domain feature",
Expand Down
4 changes: 4 additions & 0 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ type Config struct {
EnableConsistentQueryByDomain dynamicconfig.BoolPropertyFnWithDomainFilter
MaxBufferedQueryCount dynamicconfig.IntPropertyFn

// EnableContextHeaderInVisibility whether to enable indexing context header in visibility
EnableContextHeaderInVisibility dynamicconfig.BoolPropertyFnWithDomainFilter

EnableCrossClusterEngine dynamicconfig.BoolPropertyFn
EnableCrossClusterOperationsForDomain dynamicconfig.BoolPropertyFnWithDomainFilter

Expand Down Expand Up @@ -570,6 +573,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s

EnableConsistentQuery: dc.GetBoolProperty(dynamicconfig.EnableConsistentQuery),
EnableConsistentQueryByDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain),
EnableContextHeaderInVisibility: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableContextHeaderInVisibility),
EnableCrossClusterEngine: dc.GetBoolProperty(dynamicconfig.EnableCrossClusterEngine),
EnableCrossClusterOperationsForDomain: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableCrossClusterOperationsForDomain),
MaxBufferedQueryCount: dc.GetIntProperty(dynamicconfig.MaxBufferedQueryCount),
Expand Down
5 changes: 5 additions & 0 deletions service/history/task/transfer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,11 @@ func (t *transferActiveTaskExecutor) processRecordWorkflowStartedOrUpsertHelper(
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}
isCron := len(executionInfo.CronSchedule) > 0
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))
updateTimestamp := t.shard.GetTimeSource().Now()
Expand Down
110 changes: 108 additions & 2 deletions service/history/task/transfer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1696,7 +1696,57 @@ func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTask()
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1727,7 +1777,47 @@ func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttribu
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferActiveTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool { return true }
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Once().Return(nil)

err = s.transferActiveTaskExecutor.Execute(transferTask, true)
Expand Down Expand Up @@ -1834,6 +1924,7 @@ func createRecordWorkflowExecutionStartedRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.RecordWorkflowExecutionStartedRequest {
taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
workflowExecution := types.WorkflowExecution{
Expand All @@ -1846,6 +1937,12 @@ func createRecordWorkflowExecutionStartedRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}
return &persistence.RecordWorkflowExecutionStartedRequest{
Domain: domainName,
DomainUUID: taskInfo.DomainID,
Expand All @@ -1859,6 +1956,7 @@ func createRecordWorkflowExecutionStartedRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down Expand Up @@ -1975,6 +2073,7 @@ func createUpsertWorkflowSearchAttributesRequest(
mutableState execution.MutableState,
numClusters int16,
updateTime time.Time,
enableContextHeaderInVisibility bool,
) *persistence.UpsertWorkflowExecutionRequest {

taskInfo := transferTask.GetInfo().(*persistence.TransferTaskInfo)
Expand All @@ -1988,6 +2087,12 @@ func createUpsertWorkflowSearchAttributesRequest(
if backoffSeconds != 0 {
executionTimestamp = startEvent.GetTimestamp() + int64(backoffSeconds)*int64(time.Second)
}
var searchAttributes map[string][]byte
if enableContextHeaderInVisibility {
searchAttributes = map[string][]byte{
"Header.contextKey": []byte("contextValue"),
}
}

return &persistence.UpsertWorkflowExecutionRequest{
Domain: domainName,
Expand All @@ -2002,6 +2107,7 @@ func createUpsertWorkflowSearchAttributesRequest(
IsCron: len(executionInfo.CronSchedule) > 0,
NumClusters: numClusters,
UpdateTimestamp: updateTime.UnixNano(),
SearchAttributes: searchAttributes,
}
}

Expand Down
8 changes: 7 additions & 1 deletion service/history/task/transfer_standby_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,6 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
startTimestamp := startEvent.GetTimestamp()
executionTimestamp := getWorkflowExecutionTimestamp(mutableState, startEvent)
visibilityMemo := getWorkflowMemo(executionInfo.Memo)
searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
isCron := len(executionInfo.CronSchedule) > 0
updateTimestamp := t.shard.GetTimeSource().Now()

Expand All @@ -493,6 +492,13 @@ func (t *transferStandbyTaskExecutor) processRecordWorkflowStartedOrUpsertHelper
}
numClusters := (int16)(len(domainEntry.GetReplicationConfig().Clusters))

searchAttr := copySearchAttributes(executionInfo.SearchAttributes)
if t.config.EnableContextHeaderInVisibility(domainEntry.GetInfo().Name) {
if attributes := startEvent.GetWorkflowExecutionStartedEventAttributes(); attributes != nil && attributes.Header != nil {
searchAttr = appendContextHeaderToSearchAttributes(searchAttr, attributes.Header.Fields, t.config.ValidSearchAttributes())
}
}

if isRecordStart {
workflowStartedScope.IncCounter(metrics.WorkflowStartedCount)
return t.recordWorkflowStarted(
Expand Down
105 changes: 103 additions & 2 deletions service/history/task/transfer_standby_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/cluster"
dc "github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/ndc"
Expand Down Expand Up @@ -800,7 +801,62 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTask(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessRecordWorkflowStartedTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, err := test.StartWorkflow(s.mockShard, s.domainID)
s.NoError(err)
executionInfo := mutableState.GetExecutionInfo()
executionInfo.CronSchedule = "@every 5s"
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
startEvent.WorkflowExecutionStartedEventAttributes.FirstDecisionTaskBackoffSeconds = common.Int32Ptr(5)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeRecordWorkflowStarted,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, startEvent.ID, startEvent.Version)
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
if s.mockShard.GetConfig().EnableRecordWorkflowExecutionUninitialized(s.domainName) {
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionUninitialized",
mock.Anything,
createRecordWorkflowExecutionUninitializedRequest(transferTask, mutableState, s.mockShard.GetTimeSource().Now(), 1234),
).Once().Return(nil)
}
s.mockVisibilityMgr.On(
"RecordWorkflowExecutionStarted",
mock.Anything,
createRecordWorkflowExecutionStartedRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down Expand Up @@ -834,7 +890,52 @@ func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttrib
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now()),
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
false),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
err = s.transferStandbyTaskExecutor.Execute(transferTask, true)
s.Nil(err)
}

func (s *transferStandbyTaskExecutorSuite) TestProcessUpsertWorkflowSearchAttributesTaskWithContextHeader() {
// switch on context header in viz
s.mockShard.GetConfig().EnableContextHeaderInVisibility = func(domain string) bool {
return true
}
s.mockShard.GetConfig().ValidSearchAttributes = func(opts ...dc.FilterOption) map[string]interface{} {
return map[string]interface{}{
"Header.contextKey": struct{}{},
}
}

workflowExecution, mutableState, decisionCompletionID, err := test.SetupWorkflowWithCompletedDecision(s.mockShard, s.domainID)
s.NoError(err)

now := time.Now()
transferTask := s.newTransferTaskFromInfo(&persistence.TransferTaskInfo{
Version: s.version,
DomainID: s.domainID,
WorkflowID: workflowExecution.GetWorkflowID(),
RunID: workflowExecution.GetRunID(),
VisibilityTimestamp: now,
TaskID: int64(59),
TaskList: mutableState.GetExecutionInfo().TaskList,
TaskType: persistence.TransferTaskTypeUpsertWorkflowSearchAttributes,
})

persistenceMutableState, err := test.CreatePersistenceMutableState(mutableState, decisionCompletionID, mutableState.GetCurrentVersion())
s.NoError(err)
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
startEvent, err := mutableState.GetStartEvent(context.Background())
s.NoError(err)
s.mockVisibilityMgr.On(
"UpsertWorkflowExecution",
mock.Anything,
createUpsertWorkflowSearchAttributesRequest(
s.domainName, startEvent, transferTask, mutableState, 2, s.mockShard.GetTimeSource().Now(),
true),
).Return(nil).Once()

s.mockShard.SetCurrentTime(s.clusterName, now)
Expand Down
Loading

0 comments on commit 2f77e62

Please sign in to comment.