Skip to content

Commit

Permalink
Add warn logs for shard closures (#3387)
Browse files Browse the repository at this point in the history
  • Loading branch information
emrahs authored and yux0 committed Jul 14, 2020
1 parent 8d428fc commit d69bcd4
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 6 deletions.
2 changes: 1 addition & 1 deletion service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3960,4 +3960,4 @@ func getWfIDRunIDTags(wf *gen.WorkflowExecution) []tag.Tag {
tag.WorkflowID(wf.GetWorkflowId()),
tag.WorkflowRunID(wf.GetRunId()),
}
}
}
60 changes: 55 additions & 5 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,11 @@ Create_Loop:
continue Create_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
}
Expand All @@ -517,6 +522,11 @@ Create_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
}
Expand Down Expand Up @@ -595,6 +605,11 @@ Update_Loop:
continue Update_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
Expand All @@ -610,6 +625,11 @@ Update_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
Expand Down Expand Up @@ -683,6 +703,11 @@ Reset_Loop:
continue Reset_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ResetWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
}
Expand All @@ -698,6 +723,11 @@ Reset_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ResetWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
}
Expand Down Expand Up @@ -765,7 +795,7 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Reset_Loop:
Conflict_Resolve_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
Expand All @@ -781,11 +811,16 @@ Reset_Loop:
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Reset_Loop
continue Conflict_Resolve_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
break Conflict_Resolve_Loop
}
}
default:
Expand All @@ -799,8 +834,13 @@ Reset_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
break Conflict_Resolve_Loop
}
}
}
Expand Down Expand Up @@ -932,10 +972,15 @@ func (s *contextImpl) renewRangeLocked(isStealing bool) error {
if err != nil {
// Shard is stolen, trigger history engine shutdown
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
s.logger.Warn(
"Closing shard: renewRangeLocked failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
} else {
// Failure in updating shard to grab new RangeID
s.logger.Error("Persistent store operation failure",
s.logger.Error("renewRangeLocked failed due to unknown error.",
tag.StoreOperationUpdateShard,
tag.Error(err),
tag.ShardRangeID(updatedShardInfo.RangeID),
Expand Down Expand Up @@ -987,6 +1032,11 @@ func (s *contextImpl) updateShardInfoLocked() error {
if err != nil {
// Shard is stolen, trigger history engine shutdown
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
s.logger.Warn(
"Closing shard: updateShardInfoLocked failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
}
} else {
Expand Down

0 comments on commit d69bcd4

Please sign in to comment.