Skip to content

Commit

Permalink
Add task token to activityDispatchInfo for worker (cadence-workflow#3672
Browse files Browse the repository at this point in the history
)
  • Loading branch information
mkolodezny authored Feb 4, 2021
1 parent 4787d1c commit 98b9f59
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 0 deletions.
1 change: 1 addition & 0 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,7 @@ Update_History_Loop:
msBuilder,
handler.decisionAttrValidator,
workflowSizeChecker,
handler.tokenSerializer,
handler.logger,
handler.domainCache,
handler.metricsClient,
Expand Down
19 changes: 19 additions & 0 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type (
attrValidator *decisionAttrValidator
sizeLimitChecker *workflowSizeChecker

tokenSerializer common.TaskTokenSerializer

logger log.Logger
domainCache cache.DomainCache
metricsClient metrics.Client
Expand All @@ -77,6 +79,7 @@ func newDecisionTaskHandler(
mutableState execution.MutableState,
attrValidator *decisionAttrValidator,
sizeLimitChecker *workflowSizeChecker,
tokenSerializer common.TaskTokenSerializer,
logger log.Logger,
domainCache cache.DomainCache,
metricsClient metrics.Client,
Expand All @@ -102,6 +105,8 @@ func newDecisionTaskHandler(
attrValidator: attrValidator,
sizeLimitChecker: sizeLimitChecker,

tokenSerializer: tokenSerializer,

logger: logger,
domainCache: domainCache,
metricsClient: metricsClient,
Expand Down Expand Up @@ -249,6 +254,20 @@ func (handler *decisionTaskHandlerImpl) handleDecisionScheduleActivity(
if _, err1 := handler.mutableState.AddActivityTaskStartedEvent(ai, event.GetEventId(), uuid.New(), handler.identity); err1 != nil {
return nil, err1
}
token := &common.TaskToken{
DomainID: executionInfo.DomainID,
WorkflowID: executionInfo.WorkflowID,
WorkflowType: executionInfo.WorkflowTypeName,
RunID: executionInfo.RunID,
ScheduleID: ai.ScheduleID,
ScheduleAttempt: 0,
ActivityID: ai.ActivityID,
ActivityType: attr.ActivityType.GetName(),
}
activityDispatchInfo.TaskToken, err = handler.tokenSerializer.Serialize(token)
if err != nil {
return nil, ErrSerializingToken
}
activityDispatchInfo.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano())
activityDispatchInfo.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano())
activityDispatchInfo.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano())
Expand Down
2 changes: 2 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ var (
ErrWorkflowParent = &workflow.EntityNotExistsError{Message: "workflow parent does not match"}
// ErrDeserializingToken is the error to indicate task token is invalid
ErrDeserializingToken = &workflow.BadRequestError{Message: "error deserializing task token"}
// ErrSerializingToken is the error to indicate task token can not be serialized
ErrSerializingToken = &workflow.BadRequestError{Message: "error serializing task token"}
// ErrSignalOverSize is the error to indicate signal input size is > 256K
ErrSignalOverSize = &workflow.BadRequestError{Message: "signal input size is over 256K"}
// ErrCancellationAlreadyRequested is the error indicating cancellation for target workflow is already requested
Expand Down

0 comments on commit 98b9f59

Please sign in to comment.