Skip to content

Commit

Permalink
Adding heartbeat details in RespondActivityTaskFailed apis (#2560)
Browse files Browse the repository at this point in the history
* Adding heartbeat details in RespondActivityTaskFailure apis

* rework to be single transaction
  • Loading branch information
jbreiding authored Mar 8, 2022
1 parent 0a2d2e5 commit 11c67aa
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 15 deletions.
1 change: 1 addition & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[submodule "proto/api"]
path = proto/api
url = https://github.com/temporalio/api
branch = master
[submodule "develop/docker-compose/grafana/provisioning/temporalio-dashboards"]
path = develop/docker-compose/grafana/provisioning/temporalio-dashboards
url = https://github.com/temporalio/dashboards
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ require (
go.opentelemetry.io/otel/sdk v1.4.0
go.opentelemetry.io/otel/sdk/export/metric v0.27.0
go.opentelemetry.io/otel/sdk/metric v0.27.0
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a
go.temporal.io/api v1.7.1-0.20220308002655-96f7fbf5da40
go.temporal.io/sdk v1.13.0
go.temporal.io/version v0.3.0
go.uber.org/atomic v1.9.0
Expand Down Expand Up @@ -112,12 +112,12 @@ require (
go.opentelemetry.io/otel/trace v1.4.0 // indirect
go.uber.org/dig v1.13.0 // indirect
golang.org/x/crypto v0.0.0-20220210151621-f4118a5b28e2 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf // indirect
google.golang.org/genproto v0.0.0-20220307174427-659dce7fcb03 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ go.opentelemetry.io/otel/trace v1.4.0 h1:4OOUrPZdVFQkbzl/JSdvGCWIdw5ONXXxzHlaLlW
go.opentelemetry.io/otel/trace v1.4.0/go.mod h1:uc3eRsqDfWs9R7b92xbQbU42/eTNz4N+gLP8qJCi4aE=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.temporal.io/api v1.6.1-0.20211110205628-60c98e9cbfe2/go.mod h1:IlUgOTGfmJuOkGrCZdptNxyXKE9CQz6oOx7/aH9bFY4=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a h1:SgkeoCikBXMd/3fNNtymIfhpxk8o/E3zIZFBFkHzTtU=
go.temporal.io/api v1.7.1-0.20220223032354-6e6fe738916a/go.mod h1:OnUq5eS+Nyx+irKb3Ws5YB7yjGFf5XmI3WcVRU9COEo=
go.temporal.io/api v1.7.1-0.20220308002655-96f7fbf5da40 h1:1tmfg5qW4792JnltwiDa35mI5Ccv4Z8X4opbtLDias0=
go.temporal.io/api v1.7.1-0.20220308002655-96f7fbf5da40/go.mod h1:gsMe5hegA/qJoTlSUZ3fZ8dWZw5j0c7DoDeR2PPZOZY=
go.temporal.io/sdk v1.13.0 h1:8PW27o/uYAf1C1u8WUd6LNa6He2nYkBhdUX3c5gif5o=
go.temporal.io/sdk v1.13.0/go.mod h1:TCof7U/xas2FyDnx/UUEv4c/O/S41Lnhva+6JVer+Jo=
go.temporal.io/version v0.3.0 h1:dMrei9l9NyHt8nG6EB8vAwDLLTwx2SvRyucCSumAiig=
Expand Down Expand Up @@ -597,8 +597,8 @@ golang.org/x/net v0.0.0-20211109214657-ef0fda0de508/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -694,8 +694,8 @@ golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c h1:sSIdNI2Dd6vGv47bKc/xArpfxVmEz2+3j0E6I484xC4=
golang.org/x/sys v0.0.0-20220222200937-f2425489ef4c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7 h1:8IVLkfbr2cLhv0a/vKq4UFUcJym8RmDoDboxCFWEjYE=
golang.org/x/sys v0.0.0-20220307203707-22a9840ba4d7/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
Expand Down Expand Up @@ -891,8 +891,8 @@ google.golang.org/genproto v0.0.0-20220111164026-67b88f271998/go.mod h1:5CzLGKJ6
google.golang.org/genproto v0.0.0-20220114231437-d2e6a121cae0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220201184016-50beb8ab5c44/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220204002441-d6cc3cc0770e/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf h1:SVYXkUz2yZS9FWb2Gm8ivSlbNQzL2Z/NpPKE3RG2jWk=
google.golang.org/genproto v0.0.0-20220222213610-43724f9ea8cf/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/genproto v0.0.0-20220307174427-659dce7fcb03 h1:roRqEkPqsCOa0ViO0CuSwDIVdttRbBMj8oU/2E7pI08=
google.golang.org/genproto v0.0.0-20220307174427-659dce7fcb03/go.mod h1:kGP+zUP2Ddo0ayMi4YuN7C3WZyJvGLZRh8Z5wnAqvEI=
google.golang.org/grpc v1.8.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.12.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
Expand Down
52 changes: 50 additions & 2 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,28 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.Name().String())
sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.Name().String())

response := workflowservice.RespondActivityTaskFailedResponse{}

if request.GetLastHeartbeatDetails() != nil {
if err := common.CheckEventBlobSizeLimit(
request.GetLastHeartbeatDetails().Size(),
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
taskToken.GetWorkflowId(),
taskToken.GetRunId(),
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
tag.BlobSizeViolationOperation("RespondActivityTaskFailed"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))

// do not send heartbeat to history service
request.LastHeartbeatDetails = nil
}
}

if err := common.CheckEventBlobSizeLimit(
request.GetFailure().Size(),
sizeLimitWarn,
Expand All @@ -1636,6 +1658,8 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false)
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
request.Failure = serverFailure

response.Failures = append(response.Failures, serverFailure)
}

_, err = wh.historyClient.RespondActivityTaskFailed(ctx, &historyservice.RespondActivityTaskFailedRequest{
Expand All @@ -1645,7 +1669,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailed(
if err != nil {
return nil, err
}
return &workflowservice.RespondActivityTaskFailedResponse{}, nil
return &response, nil
}

// RespondActivityTaskFailedById is called by application worker when it is done processing an ActivityTask.
Expand Down Expand Up @@ -1707,6 +1731,28 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
sizeLimitError := wh.config.BlobSizeLimitError(namespaceEntry.Name().String())
sizeLimitWarn := wh.config.BlobSizeLimitWarn(namespaceEntry.Name().String())

response := workflowservice.RespondActivityTaskFailedByIdResponse{}

if request.GetLastHeartbeatDetails() != nil {
if err := common.CheckEventBlobSizeLimit(
request.GetLastHeartbeatDetails().Size(),
sizeLimitWarn,
sizeLimitError,
namespaceID.String(),
taskToken.GetWorkflowId(),
runID,
wh.metricsScope(ctx).Tagged(metrics.CommandTypeTag(enumspb.COMMAND_TYPE_UNSPECIFIED.String())),
wh.throttledLogger,
tag.BlobSizeViolationOperation("RespondActivityTaskFailedById"),
); err != nil {
// heartbeat details exceed size limit, we would fail the activity immediately with explicit error reason
response.Failures = append(response.Failures, failure.NewServerFailure(common.FailureReasonHeartbeatExceedsLimit, true))

// do not send heartbeat to history service
request.LastHeartbeatDetails = nil
}
}

if err := common.CheckEventBlobSizeLimit(
request.GetFailure().Size(),
sizeLimitWarn,
Expand All @@ -1721,6 +1767,8 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
serverFailure := failure.NewServerFailure(common.FailureReasonFailureExceedsLimit, false)
serverFailure.Cause = failure.Truncate(request.Failure, sizeLimitWarn)
request.Failure = serverFailure

response.Failures = append(response.Failures, serverFailure)
}

req := &workflowservice.RespondActivityTaskFailedRequest{
Expand All @@ -1736,7 +1784,7 @@ func (wh *WorkflowHandler) RespondActivityTaskFailedById(ctx context.Context, re
if err != nil {
return nil, err
}
return &workflowservice.RespondActivityTaskFailedByIdResponse{}, nil
return &response, nil
}

// RespondActivityTaskCanceled is called by application worker when it is successfully canceled an ActivityTask. It will
Expand Down
12 changes: 12 additions & 0 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,6 +1569,18 @@ func (e *historyEngineImpl) RespondActivityTaskFailed(
return nil, consts.ErrActivityTaskNotFound
}

e.logger.Debug("RespondActivityTaskFailed", tag.WorkflowScheduleID(scheduleID), tag.ActivityInfo(ai), tag.NewBoolTag("hasHeartbeatDetails", request.GetLastHeartbeatDetails() != nil))

if request.GetLastHeartbeatDetails() != nil {
// Save heartbeat details as progress
mutableState.UpdateActivityProgress(ai, &workflowservice.RecordActivityTaskHeartbeatRequest{
TaskToken: request.GetTaskToken(),
Details: request.GetLastHeartbeatDetails(),
Identity: request.GetIdentity(),
Namespace: request.GetNamespace(),
})
}

postActions := &updateWorkflowAction{}
failure := request.GetFailure()
retryState, err := mutableState.RetryActivity(ai, failure)
Expand Down
65 changes: 65 additions & 0 deletions service/history/historyEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3169,6 +3169,71 @@ func (s *engineSuite) TestRespondActivityTaskFailedSuccess() {
s.Equal(common.EmptyEventID, di.StartedID)
}

func (s *engineSuite) TestRespondActivityTaskFailedWithHeartbeatSuccess() {

we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
}
tl := "testTaskQueue"
tt := &tokenspb.Task{
ScheduleAttempt: 1,
WorkflowId: we.WorkflowId,
RunId: we.RunId,
ScheduleId: 5,
}
taskToken, _ := tt.Marshal()
identity := "testIdentity"
activityID := "activity1_id"
activityType := "activity_type1"
activityInput := payloads.EncodeString("input1")
failure := failure.NewServerFailure("failed", false)

msBuilder := workflow.TestLocalMutableState(s.mockHistoryEngine.shard, s.eventsCache,
tests.LocalNamespaceEntry, log.NewTestLogger(), we.GetRunId())
addWorkflowExecutionStartedEvent(msBuilder, we, "wType", tl, payloads.EncodeString("input"), 100*time.Second, 100*time.Second, 100*time.Second, identity)
di := addWorkflowTaskScheduledEvent(msBuilder)
workflowTaskStartedEvent := addWorkflowTaskStartedEvent(msBuilder, di.ScheduleID, tl, identity)
workflowTaskCompletedEvent := addWorkflowTaskCompletedEvent(msBuilder, di.ScheduleID, workflowTaskStartedEvent.EventId, identity)
activityScheduledEvent, activityInfo := addActivityTaskScheduledEvent(msBuilder, workflowTaskCompletedEvent.EventId, activityID, activityType, tl, activityInput, 100*time.Second, 10*time.Second, 1*time.Second, 5*time.Second)
addActivityTaskStartedEvent(msBuilder, activityScheduledEvent.EventId, identity)

ms := workflow.TestCloneToProto(msBuilder)
ms.ActivityInfos[activityInfo.ScheduleId] = activityInfo
gwmsResponse := &persistence.GetWorkflowExecutionResponse{State: ms}

s.mockExecutionMgr.EXPECT().GetWorkflowExecution(gomock.Any()).Return(gwmsResponse, nil)
s.mockExecutionMgr.EXPECT().UpdateWorkflowExecution(gomock.Any()).Return(tests.UpdateWorkflowExecutionResponse, nil)

details := payloads.EncodeString("details")

s.Nil(activityInfo.GetLastHeartbeatDetails())

err := s.mockHistoryEngine.RespondActivityTaskFailed(context.Background(), &historyservice.RespondActivityTaskFailedRequest{
NamespaceId: tests.NamespaceID.String(),
FailedRequest: &workflowservice.RespondActivityTaskFailedRequest{
TaskToken: taskToken,
Failure: failure,
Identity: identity,
LastHeartbeatDetails: details,
},
})
s.Nil(err)
executionBuilder := s.getBuilder(tests.NamespaceID, we)
s.Equal(int64(9), executionBuilder.GetNextEventID())
s.Equal(int64(3), executionBuilder.GetExecutionInfo().LastWorkflowTaskStartId)
s.Equal(enumsspb.WORKFLOW_EXECUTION_STATE_RUNNING, executionBuilder.GetExecutionState().State)

s.True(executionBuilder.HasPendingWorkflowTask())
di, ok := executionBuilder.GetWorkflowTaskInfo(int64(8))
s.True(ok)
s.EqualValues(int64(100), di.WorkflowTaskTimeout.Seconds())
s.Equal(int64(8), di.ScheduleID)
s.Equal(common.EmptyEventID, di.StartedID)

s.NotNil(activityInfo.GetLastHeartbeatDetails())
}

func (s *engineSuite) TestRespondActivityTaskFailedByIdSuccess() {

we := commonpb.WorkflowExecution{
Expand Down

0 comments on commit 11c67aa

Please sign in to comment.