Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a sample call to TaskValidator in update workflow cycle #5634

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,13 @@ const (

EnableTimerDebugLogByDomainID

// EnableTaskVal is which allows the taskvalidation to be enabled.
// KeyName: system.enableTaskVal
// Value type: Bool
// Default value: false
// Allowed filters: DomainID
EnableTaskVal

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4165,6 +4172,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "Enable log for debugging timer task issue by domain",
DefaultValue: false,
},
EnableTaskVal: DynamicBool{
KeyName: "system.enableTaskVal",
Description: "Enable TaskValidation",
DefaultValue: false,
},
WorkflowIDCacheEnabled: DynamicBool{
KeyName: "history.workflowIDCacheEnabled",
Filters: []Filter{DomainName},
Expand Down
15 changes: 13 additions & 2 deletions common/taskvalidator/validateworkflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ package taskvalidator
import (
"context"
"errors"
"time"

"go.uber.org/zap"

"github.com/uber/cadence/common/backoff"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
Expand Down Expand Up @@ -59,14 +61,23 @@ type checkerImpl struct {
// NewWfChecker creates a new instance of a workflow validation checker.
// It requires a logger, metrics client, domain cache, persistence retryer,
// and a stale checker implementation to function.
func NewWfChecker(logger *zap.Logger, metrics metrics.Client, domainCache cache.DomainCache, pr persistence.Retryer, staleCheck staleChecker) Checker {
func NewWfChecker(logger *zap.Logger, metrics metrics.Client, domainCache cache.DomainCache, executionManager persistence.ExecutionManager, historymanager persistence.HistoryManager) (Checker, error) {
// Create the persistence retryer
retryPolicy := backoff.NewExponentialRetryPolicy(100 * time.Millisecond) // Adjust as needed
pr := persistence.NewPersistenceRetryer(executionManager, historymanager, retryPolicy)

// Create the stale check instance
staleCheckInstance := invariant.NewStaleWorkflow(pr, domainCache, logger)
staleCheck, _ := staleCheckInstance.(staleChecker)

// Return the checker implementation
return &checkerImpl{
logger: logger,
metricsClient: metrics,
dc: domainCache,
pr: pr,
staleCheck: staleCheck,
}
}, nil
}

// WorkflowCheckforValidation performs workflow validation.
Expand Down
62 changes: 24 additions & 38 deletions common/taskvalidator/validateworkflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,50 +68,36 @@ func TestWorkflowCheckforValidation(t *testing.T) {
mockLogger := zap.NewNop()
mockMetricsClient := metrics.NewNoopMetricsClient()
mockDomainCache := cache.NewMockDomainCache(mockCtrl)
mockPersistenceRetryer := persistence.NewMockRetryer(mockCtrl)
mockStaleChecker := &mockStaleChecker{
CheckAgeFunc: func(response *persistence.GetWorkflowExecutionResponse) (bool, error) {
return tc.isStale, nil
},
}
checker := NewWfChecker(mockLogger, mockMetricsClient, mockDomainCache, mockPersistenceRetryer, mockStaleChecker)
mockDomainCache.EXPECT().
GetDomainByID(tc.domainID).
Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
// In each test case
mockDomainCache.EXPECT().
GetDomainName(gomock.Any()). // You can use gomock.Any() if the exact argument is not important
Return("test-domain-name", nil).AnyTimes()

// For test cases where deletion is expected
mockExecutionManager := persistence.NewMockExecutionManager(mockCtrl)
mockHistoryManager := persistence.NewMockHistoryManager(mockCtrl)

checker, err := NewWfChecker(mockLogger, mockMetricsClient, mockDomainCache, mockExecutionManager, mockHistoryManager)
assert.NoError(t, err, "Failed to create checker")

mockDomainCache.EXPECT().GetDomainByID(tc.domainID).Return(constants.TestGlobalDomainEntry, nil).AnyTimes()
mockDomainCache.EXPECT().GetDomainName(tc.domainID).Return(tc.domainName, nil).AnyTimes()

if tc.isStale {
mockPersistenceRetryer.EXPECT().
DeleteWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
mockPersistenceRetryer.EXPECT().
DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).
Return(nil).Times(1)
mockExecutionManager.EXPECT().DeleteWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockExecutionManager.EXPECT().DeleteCurrentWorkflowExecution(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
}

mockPersistenceRetryer.EXPECT().
GetWorkflowExecution(gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) {
if tc.simulateError {
return nil, errors.New("database error")
}
// Return a valid response object to trigger the deletion calls
return &persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: constants.TestDomainID,
WorkflowID: constants.TestWorkflowID,
},
mockExecutionManager.EXPECT().GetWorkflowExecution(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, request *persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) {
if tc.simulateError {
return nil, errors.New("database error")
}
return &persistence.GetWorkflowExecutionResponse{
State: &persistence.WorkflowMutableState{
ExecutionInfo: &persistence.WorkflowExecutionInfo{
DomainID: constants.TestDomainID,
WorkflowID: constants.TestWorkflowID,
},
}, nil
}).AnyTimes()
},
}, nil
}).AnyTimes()

ctx := context.Background()
err := checker.WorkflowCheckforValidation(ctx, tc.workflowID, tc.domainID, tc.domainName, tc.runID)
err = checker.WorkflowCheckforValidation(ctx, tc.workflowID, tc.domainID, tc.domainName, tc.runID)

if tc.simulateError {
assert.Error(t, err, "Expected error when GetWorkflowExecution fails")
Expand Down
18 changes: 18 additions & 0 deletions service/history/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,13 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
resp, err = c.shard.UpdateWorkflowExecution(ctx, request)
return err
}
//Preparation for the task Validation.
//metricsClient := c.shard.GetMetricsClient()
//domainCache := c.shard.GetDomainCache()
//executionManager := c.shard.GetExecutionManager()
//historymanager := c.shard.GetHistoryManager()
//zapLogger, _ := zap.NewProduction()
//checker, _ := taskvalidator.NewWfChecker(zapLogger, metricsClient, domainCache, executionManager, historymanager)
neil-xie marked this conversation as resolved.
Show resolved Hide resolved

isRetryable := func(err error) bool {
if _, ok := err.(*persistence.TimeoutError); ok {
Expand Down Expand Up @@ -1247,6 +1254,17 @@ func (c *contextImpl) updateWorkflowExecutionWithRetry(
tag.Error(err),
tag.Number(c.updateCondition),
)
//TODO: Call the Task Validation here so that it happens whenever an error happen during Update.
//err1 := checker.WorkflowCheckforValidation(
// ctx,
// c.workflowExecution.GetWorkflowID(),
// c.domainID,
// c.GetDomainName(),
// c.workflowExecution.GetRunID(),
//)
//if err1 != nil {
// return nil, err1
//}
return nil, err
}
}
Expand Down