-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add signal external workflow decision #485
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed IDL and schema changes. Will do the engine changes tomorrow.
@@ -109,6 +110,9 @@ enum EventType { | |||
RequestCancelExternalWorkflowExecutionFailed, | |||
ExternalWorkflowExecutionCancelRequested, | |||
MarkerRecorded, | |||
SignalExternalWorkflowExecutionInitiated, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will break existing clients. Please add new values at the end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good to know. Thanks.
@@ -132,6 +136,7 @@ enum DecisionTaskFailedCause { | |||
BAD_COMPLETE_WORKFLOW_EXECUTION_ATTRIBUTES, | |||
BAD_FAIL_WORKFLOW_EXECUTION_ATTRIBUTES, | |||
BAD_CANCEL_WORKFLOW_EXECUTION_ATTRIBUTES, | |||
BAD_SIGNAL_WORKFLOW_EXECUTION_ATTRIBUTES, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add new enums towards the very end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
60: optional binary control | ||
} | ||
|
||
struct ExternalWorkflowExecutionSignalRequestedEventAttributes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably rename it to ExternalWorkflowExecutionSignaledEventAttributes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will probably not change this, because the event type is EventTypeExternalWorkflowExecutionSignalRequested
. If I rename this to ExternalWorkflowExecutionSignaledEventAttributes, then I probably need to change the event name to EventTypeExternalWorkflowExecutionSignaled
, but we try to be consistent with SWF so wouldn't change event name.
I'm still open minded, let me know if you still prefer new name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SWF is using ExternalWorkflowExecutionSignaled.
struct DeleteWorkflowExecutionSignalRequest { | ||
10: optional string domain | ||
20: optional WorkflowExecution workflowExecution | ||
30: optional string identity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably identity does not make sense here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, removed.
* DeleteWorkflowExecutionSignal is used to delete a signal request ID that was previously recorded. This is currently | ||
* used to clean execution info when signal decision finished. | ||
**/ | ||
void DeleteWorkflowExecutionSignal(1: DeleteWorkflowExecutionSignalRequest deleteRequest) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rename this to RemoveSignalMutableState
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -0,0 +1,9 @@ | |||
CREATE TYPE signal_info ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should go into v0.3. We have already released v0.2.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move when rebase
common/persistence/dataInterfaces.go
Outdated
TargetDomainID string | ||
TargetWorkflowID string | ||
TargetRunID string | ||
ScheduleID int64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename to InitiatedID
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
InitiatedID int64 | ||
SignalRequestID string | ||
SignalName string | ||
Input []byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably need to store the entire SignalInitiatedEvent here.
common/persistence/dataInterfaces.go
Outdated
@@ -655,6 +686,7 @@ type ( | |||
GetWorkflowExecution(request *GetWorkflowExecutionRequest) (*GetWorkflowExecutionResponse, error) | |||
UpdateWorkflowExecution(request *UpdateWorkflowExecutionRequest) error | |||
DeleteWorkflowExecution(request *DeleteWorkflowExecutionRequest) error | |||
DeleteSignalRequestedID(request *DeleteWorkflowExecutionSignalRequestedRequest) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this is a new API but DeleteSignalInfo is part of update?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
`and workflow_id = ? ` + | ||
`and run_id = ? ` + | ||
`and visibility_ts = ? ` + | ||
`and task_id = ? ` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also updating the mutable state. Make sure it is protected by conditional check on range_id and next_event_id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed, protected by using updateWorkflowExecution.
return event | ||
} | ||
|
||
func (b *historyBuilder) newSignalExternalWorkflowExecutionFailedEvent(decisionTaskCompletedEventID, initiatedEventID int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You never set control on this event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
service/history/historyBuilder.go
Outdated
return event | ||
} | ||
|
||
func (b *historyBuilder) newExternalWorkflowExecutionSignalRequestedEvent(initiatedEventID int64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You never set control on this event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
service/history/historyEngine.go
Outdated
transferTasks = append(transferTasks, &persistence.SignalExecutionTask{ | ||
TargetDomainID: foreignInfo.ID, | ||
TargetWorkflowID: attributes.GetWorkflowId(), | ||
TargetRunID: common.StringDefault(attributes.RunId), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the generated getter: attributes.GetRunId()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
service/history/historyEngine.go
Outdated
@@ -1528,6 +1560,17 @@ func (e *historyEngineImpl) SignalWorkflowExecution(signalRequest *h.SignalWorkf | |||
return &workflow.EntityNotExistsError{Message: "Workflow execution already completed."} | |||
} | |||
|
|||
// deduplicate by request id for signal decision | |||
if request.RequestId != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be simplified if you use the generated getter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, good to know
service/history/historyEngine.go
Outdated
@@ -1536,6 +1579,42 @@ func (e *historyEngineImpl) SignalWorkflowExecution(signalRequest *h.SignalWorkf | |||
}) | |||
} | |||
|
|||
func (e *historyEngineImpl) DeleteWorkflowExecutionSignal(deleteRequest *h.DeleteWorkflowExecutionSignalRequest) error { | |||
domainID, err := getDomainUUID(deleteRequest.DomainUUID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets use the updateWorkflowExecution helper defined in historyEngineImpl for making this update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed.
return nil | ||
} | ||
|
||
if e.DeletePendingSignal(initiatedID) == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this is not nil?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. We have some options:
- add new
PendingSignalNotExistEvent
event when err happens - add success (or failed) event no matter err happens or not
- add nothing when err happens (current implementation) .
Which one do you like best?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added log if err happens.
return nil | ||
} | ||
|
||
if e.DeletePendingSignal(initiatedID) == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is this is not nil?
}, | ||
} | ||
|
||
op := func() error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just create a helper for SignalExecutionWithRetry.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
initiatedEventID := task.ScheduleID | ||
ri, isRunning := msBuilder.GetSignalInfo(initiatedEventID) | ||
if !isRunning { | ||
return nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You still want to call delete on the target to make sure we don't leak requestIDs in mutable state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In which case leak can happens?
I found it hard to delete target here because no ri.SignalRequestId can be obtained here.
|
||
// Check to see if the error is non-transient, in which case add SignalFailed | ||
// event and complete transfer task by setting the err = nil | ||
if common.IsServiceNonRetryableError(err) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sure to add a test for expected error types here. Otherwise this transfer task can go into infinite loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for transferQueueProcessor
related #429
client side change: cadence-workflow/cadence-go-client#329