From 98b9f59f07a18ec609f5d8e3c1897660581a3c81 Mon Sep 17 00:00:00 2001 From: Max K <25259015+mkolodezny@users.noreply.github.com> Date: Tue, 20 Oct 2020 14:20:57 -0700 Subject: [PATCH] Add task token to activityDispatchInfo for worker (#3672) --- service/history/decisionHandler.go | 1 + service/history/decisionTaskHandler.go | 19 +++++++++++++++++++ service/history/historyEngine.go | 2 ++ 3 files changed, 22 insertions(+) diff --git a/service/history/decisionHandler.go b/service/history/decisionHandler.go index 52a0ad584ed..bec9d21e7cf 100644 --- a/service/history/decisionHandler.go +++ b/service/history/decisionHandler.go @@ -417,6 +417,7 @@ Update_History_Loop: msBuilder, handler.decisionAttrValidator, workflowSizeChecker, + handler.tokenSerializer, handler.logger, handler.domainCache, handler.metricsClient, diff --git a/service/history/decisionTaskHandler.go b/service/history/decisionTaskHandler.go index e6a25dda8be..b9d924a5a4b 100644 --- a/service/history/decisionTaskHandler.go +++ b/service/history/decisionTaskHandler.go @@ -59,6 +59,8 @@ type ( attrValidator *decisionAttrValidator sizeLimitChecker *workflowSizeChecker + tokenSerializer common.TaskTokenSerializer + logger log.Logger domainCache cache.DomainCache metricsClient metrics.Client @@ -77,6 +79,7 @@ func newDecisionTaskHandler( mutableState execution.MutableState, attrValidator *decisionAttrValidator, sizeLimitChecker *workflowSizeChecker, + tokenSerializer common.TaskTokenSerializer, logger log.Logger, domainCache cache.DomainCache, metricsClient metrics.Client, @@ -102,6 +105,8 @@ func newDecisionTaskHandler( attrValidator: attrValidator, sizeLimitChecker: sizeLimitChecker, + tokenSerializer: tokenSerializer, + logger: logger, domainCache: domainCache, metricsClient: metricsClient, @@ -249,6 +254,20 @@ func (handler *decisionTaskHandlerImpl) handleDecisionScheduleActivity( if _, err1 := handler.mutableState.AddActivityTaskStartedEvent(ai, event.GetEventId(), uuid.New(), handler.identity); err1 != nil { return nil, err1 } + token := &common.TaskToken{ + DomainID: executionInfo.DomainID, + WorkflowID: executionInfo.WorkflowID, + WorkflowType: executionInfo.WorkflowTypeName, + RunID: executionInfo.RunID, + ScheduleID: ai.ScheduleID, + ScheduleAttempt: 0, + ActivityID: ai.ActivityID, + ActivityType: attr.ActivityType.GetName(), + } + activityDispatchInfo.TaskToken, err = handler.tokenSerializer.Serialize(token) + if err != nil { + return nil, ErrSerializingToken + } activityDispatchInfo.ScheduledTimestamp = common.Int64Ptr(ai.ScheduledTime.UnixNano()) activityDispatchInfo.ScheduledTimestampOfThisAttempt = common.Int64Ptr(ai.ScheduledTime.UnixNano()) activityDispatchInfo.StartedTimestamp = common.Int64Ptr(ai.StartedTime.UnixNano()) diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 26d96957ea2..112c49901d9 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -135,6 +135,8 @@ var ( ErrWorkflowParent = &workflow.EntityNotExistsError{Message: "workflow parent does not match"} // ErrDeserializingToken is the error to indicate task token is invalid ErrDeserializingToken = &workflow.BadRequestError{Message: "error deserializing task token"} + // ErrSerializingToken is the error to indicate task token can not be serialized + ErrSerializingToken = &workflow.BadRequestError{Message: "error serializing task token"} // ErrSignalOverSize is the error to indicate signal input size is > 256K ErrSignalOverSize = &workflow.BadRequestError{Message: "signal input size is over 256K"} // ErrCancellationAlreadyRequested is the error indicating cancellation for target workflow is already requested