Skip to content

Commit

Permalink
Move serialization of datablobs to manager for shardManager (#3709)
Browse files Browse the repository at this point in the history
Co-authored-by: Andrew Dawson <[email protected]>
  • Loading branch information
anish531213 and andrewjdawson2016 authored Oct 30, 2020
1 parent d47b0d1 commit 88fd2c5
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 100 deletions.
39 changes: 21 additions & 18 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (
"strings"
"time"

"github.com/uber/cadence/.gen/go/history"
"github.com/uber/cadence/.gen/go/replicator"

"github.com/pborman/uuid"

workflow "github.com/uber/cadence/.gen/go/shared"
Expand Down Expand Up @@ -243,24 +246,24 @@ type (

// ShardInfo describes a shard
ShardInfo struct {
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"`
ShardID int `json:"shard_id"`
Owner string `json:"owner"`
RangeID int64 `json:"range_id"`
StolenSinceRenew int `json:"stolen_since_renew"`
UpdatedAt time.Time `json:"updated_at"`
ReplicationAckLevel int64 `json:"replication_ack_level"`
ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"`
TransferAckLevel int64 `json:"transfer_ack_level"`
TimerAckLevel time.Time `json:"timer_ack_level"`
ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"`
ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"`
TransferProcessingQueueStates *history.ProcessingQueueStates `json:"transfer_processing_queue_states"`
TimerProcessingQueueStates *history.ProcessingQueueStates `json:"timer_processing_queue_states"`
TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel
TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel
ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"`
DomainNotificationVersion int64 `json:"domain_notification_version"`
PendingFailoverMarkers []*replicator.FailoverMarkerAttributes `json:"pending_failover_markers"`
}

// TransferFailoverLevel contains corresponding start / end level
Expand Down
40 changes: 8 additions & 32 deletions common/persistence/persistence-tests/executionManagerTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4935,16 +4935,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
domainNotificationVersion := int64(8192)
transferPQS := createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck)
transferPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&transferPQS,
common.EncodingTypeThriftRW,
)
timerPQS := createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck)
timerPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&timerPQS,
common.EncodingTypeThriftRW,
)
shardInfo := &p.ShardInfo{
ShardID: shardID,
Owner: "some random owner",
Expand All @@ -4962,8 +4954,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferProcessingQueueStates: transferPQSBlob,
TimerProcessingQueueStates: timerPQSBlob,
TransferProcessingQueueStates: &transferPQS,
TimerProcessingQueueStates: &timerPQS,
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{},
ReplicationDLQAckLevel: map[string]int64{},
Expand All @@ -4980,12 +4972,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates)
deserializedTransferPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates)
s.Nil(err)
s.Equal(&transferPQS, deserializedTransferPQS)
deserializedTimerPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates)
s.Nil(err)
s.Equal(&timerPQS, deserializedTimerPQS)
s.Equal(&transferPQS, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(&timerPQS, resp.ShardInfo.TimerProcessingQueueStates)

resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
Expand All @@ -5003,16 +4991,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
domainNotificationVersion = int64(16384)
transferPQS = createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck)
transferPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates(
&transferPQS,
common.EncodingTypeThriftRW,
)
timerPQS = createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck,
cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck)
timerPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates(
&timerPQS,
common.EncodingTypeThriftRW,
)
shardInfo = &p.ShardInfo{
ShardID: shardID,
Owner: "some random owner",
Expand All @@ -5030,8 +5010,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
cluster.TestCurrentClusterName: currentClusterTimerAck,
cluster.TestAlternativeClusterName: alternativeClusterTimerAck,
},
TransferProcessingQueueStates: transferPQSBlob,
TimerProcessingQueueStates: timerPQSBlob,
TransferProcessingQueueStates: &transferPQS,
TimerProcessingQueueStates: &timerPQS,
DomainNotificationVersion: domainNotificationVersion,
ClusterReplicationLevel: map[string]int64{cluster.TestAlternativeClusterName: 12345},
ReplicationDLQAckLevel: map[string]int64{},
Expand All @@ -5050,12 +5030,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() {
s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano())
s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates)
deserializedTransferPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates)
s.Nil(err)
s.Equal(&transferPQS, deserializedTransferPQS)
deserializedTimerPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates)
s.Nil(err)
s.Equal(&timerPQS, deserializedTimerPQS)
s.Equal(&transferPQS, resp.ShardInfo.TransferProcessingQueueStates)
s.Equal(&timerPQS, resp.ShardInfo.TimerProcessingQueueStates)

resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt
resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel
Expand Down
12 changes: 2 additions & 10 deletions common/persistence/persistence-tests/persistenceTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,6 @@ func (s *TestBase) Setup() {
},
}
transferPQS := history.ProcessingQueueStates{transferPQSMap}
transferPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&transferPQS,
common.EncodingTypeThriftRW,
)
timerPQSMap := map[string][]*history.ProcessingQueueState{
s.ClusterMetadata.GetCurrentClusterName(): {
&history.ProcessingQueueState{
Expand All @@ -245,10 +241,6 @@ func (s *TestBase) Setup() {
},
}
timerPQS := history.ProcessingQueueStates{StatesByCluster: timerPQSMap}
timerPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates(
&timerPQS,
common.EncodingTypeThriftRW,
)

s.ShardInfo = &p.ShardInfo{
ShardID: shardID,
Expand All @@ -258,8 +250,8 @@ func (s *TestBase) Setup() {
TimerAckLevel: time.Time{},
ClusterTimerAckLevel: map[string]time.Time{clusterName: time.Time{}},
ClusterTransferAckLevel: map[string]int64{clusterName: 0},
TransferProcessingQueueStates: transferPQSBlob,
TimerProcessingQueueStates: timerPQSBlob,
TransferProcessingQueueStates: &transferPQS,
TimerProcessingQueueStates: &timerPQS,
}

s.TaskIDGenerator = &TestTransferTaskIDGenerator{}
Expand Down
72 changes: 57 additions & 15 deletions common/persistence/shardManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ package persistence

import (
"context"

"github.com/uber/cadence/common"
)

type (
shardManager struct {
persistence ShardStore
serializer PayloadSerializer
}
)

Expand All @@ -40,6 +43,7 @@ func NewShardManager(
) ShardManager {
return &shardManager{
persistence: persistence,
serializer: NewPayloadSerializer(),
}
}

Expand All @@ -52,8 +56,12 @@ func (m *shardManager) Close() {
}

func (m *shardManager) CreateShard(ctx context.Context, request *CreateShardRequest) error {
shardInfo, err := m.toInternalShardInfo(request.ShardInfo)
if err != nil {
return err
}
internalRequest := &InternalCreateShardRequest{
ShardInfo: m.toInternalShardInfo(request.ShardInfo),
ShardInfo: shardInfo,
}
return m.persistence.CreateShard(ctx, internalRequest)
}
Expand All @@ -66,24 +74,45 @@ func (m *shardManager) GetShard(ctx context.Context, request *GetShardRequest) (
if err != nil {
return nil, err
}
shardInfo, err := m.fromInternalShardInfo(internalResult.ShardInfo)
if err != nil {
return nil, err
}
result := &GetShardResponse{
ShardInfo: m.fromInternalShardInfo(internalResult.ShardInfo),
ShardInfo: shardInfo,
}
return result, nil
}

func (m *shardManager) UpdateShard(ctx context.Context, request *UpdateShardRequest) error {
shardInfo, err := m.toInternalShardInfo(request.ShardInfo)
if err != nil {
return err
}
internalRequest := &InternalUpdateShardRequest{
ShardInfo: m.toInternalShardInfo(request.ShardInfo),
ShardInfo: shardInfo,
PreviousRangeID: request.PreviousRangeID,
}
return m.persistence.UpdateShard(ctx, internalRequest)
}

func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardInfo {
func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) (*InternalShardInfo, error) {
if shardInfo == nil {
return nil
return nil, nil
}
serializedTransferProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TransferProcessingQueueStates, common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}
serializedTimerProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TimerProcessingQueueStates, common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}
pendingFailoverMarker, err := m.serializer.SerializePendingFailoverMarkers(shardInfo.PendingFailoverMarkers, common.EncodingTypeThriftRW)
if err != nil {
return nil, err
}

return &InternalShardInfo{
ShardID: shardInfo.ShardID,
Owner: shardInfo.Owner,
Expand All @@ -96,20 +125,33 @@ func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardI
TimerAckLevel: shardInfo.TimerAckLevel,
ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: shardInfo.ClusterTimerAckLevel,
TransferProcessingQueueStates: shardInfo.TransferProcessingQueueStates,
TimerProcessingQueueStates: shardInfo.TimerProcessingQueueStates,
TransferProcessingQueueStates: serializedTransferProcessingQueueStates,
TimerProcessingQueueStates: serializedTimerProcessingQueueStates,
ClusterReplicationLevel: shardInfo.ClusterReplicationLevel,
DomainNotificationVersion: shardInfo.DomainNotificationVersion,
PendingFailoverMarkers: shardInfo.PendingFailoverMarkers,
PendingFailoverMarkers: pendingFailoverMarker,
TransferFailoverLevels: shardInfo.TransferFailoverLevels,
TimerFailoverLevels: shardInfo.TimerFailoverLevels,
}
}, nil
}

func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) *ShardInfo {
func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) (*ShardInfo, error) {
if internalShardInfo == nil {
return nil
return nil, nil
}
transferProcessingQueueStates, err := m.serializer.DeserializeProcessingQueueStates(internalShardInfo.TransferProcessingQueueStates)
if err != nil {
return nil, err
}
timerProcessingQueueStates, err := m.serializer.DeserializeProcessingQueueStates(internalShardInfo.TimerProcessingQueueStates)
if err != nil {
return nil, err
}
pendingFailoverMarker, err := m.serializer.DeserializePendingFailoverMarkers(internalShardInfo.PendingFailoverMarkers)
if err != nil {
return nil, err
}

return &ShardInfo{
ShardID: internalShardInfo.ShardID,
Owner: internalShardInfo.Owner,
Expand All @@ -122,12 +164,12 @@ func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInf
TimerAckLevel: internalShardInfo.TimerAckLevel,
ClusterTransferAckLevel: internalShardInfo.ClusterTransferAckLevel,
ClusterTimerAckLevel: internalShardInfo.ClusterTimerAckLevel,
TransferProcessingQueueStates: internalShardInfo.TransferProcessingQueueStates,
TimerProcessingQueueStates: internalShardInfo.TimerProcessingQueueStates,
TransferProcessingQueueStates: transferProcessingQueueStates,
TimerProcessingQueueStates: timerProcessingQueueStates,
ClusterReplicationLevel: internalShardInfo.ClusterReplicationLevel,
DomainNotificationVersion: internalShardInfo.DomainNotificationVersion,
PendingFailoverMarkers: internalShardInfo.PendingFailoverMarkers,
PendingFailoverMarkers: pendingFailoverMarker,
TransferFailoverLevels: internalShardInfo.TransferFailoverLevels,
TimerFailoverLevels: internalShardInfo.TimerFailoverLevels,
}
}, nil
}
30 changes: 5 additions & 25 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,13 +305,7 @@ func (s *contextImpl) UpdateTransferProcessingQueueStates(cluster string, states
s.transferProcessingQueueStates.StatesByCluster = make(map[string][]*history.ProcessingQueueState)
}
s.transferProcessingQueueStates.StatesByCluster[cluster] = states
serializer := s.GetPayloadSerializer()
data, err := serializer.SerializeProcessingQueueStates(s.transferProcessingQueueStates, common.EncodingTypeThriftRW)
if err != nil {
s.logger.Error("Failed to serialize transfer processing queue states", tag.Error(err))
return err
}
s.shardInfo.TransferProcessingQueueStates = data
s.shardInfo.TransferProcessingQueueStates = s.transferProcessingQueueStates

// for backward compatibility
ackLevel := states[0].GetAckLevel()
Expand Down Expand Up @@ -478,13 +472,7 @@ func (s *contextImpl) UpdateTimerProcessingQueueStates(cluster string, states []
s.timerProcessingQueueStates.StatesByCluster = make(map[string][]*history.ProcessingQueueState)
}
s.timerProcessingQueueStates.StatesByCluster[cluster] = states
serializer := s.GetPayloadSerializer()
data, err := serializer.SerializeProcessingQueueStates(s.timerProcessingQueueStates, common.EncodingTypeThriftRW)
if err != nil {
s.logger.Error("Failed to serialize timer processing queue states", tag.Error(err))
return err
}
s.shardInfo.TimerProcessingQueueStates = data
s.shardInfo.TimerProcessingQueueStates = s.timerProcessingQueueStates

// for backward compatibility
ackLevel := states[0].GetAckLevel()
Expand Down Expand Up @@ -1451,14 +1439,7 @@ func (s *contextImpl) ValidateAndUpdateFailoverMarkers() ([]*replicator.Failover
}

func (s *contextImpl) updateFailoverMarkersInShardInfoLocked() error {

serializer := s.GetPayloadSerializer()
data, err := serializer.SerializePendingFailoverMarkers(s.pendingFailoverMarkers, common.EncodingTypeThriftRW)
if err != nil {
return err
}

s.shardInfo.PendingFailoverMarkers = data
s.shardInfo.PendingFailoverMarkers = s.pendingFailoverMarkers
return nil
}

Expand Down Expand Up @@ -1535,8 +1516,7 @@ func acquireShard(
timerMaxReadLevelMap[clusterName] = timerMaxReadLevelMap[clusterName].Truncate(time.Millisecond)
}

serializer := shardItem.GetPayloadSerializer()
transferProcessingQueueStates, err := serializer.DeserializeProcessingQueueStates(shardInfo.TransferProcessingQueueStates)
transferProcessingQueueStates := shardInfo.TransferProcessingQueueStates
if err != nil {
return nil, err
}
Expand All @@ -1545,7 +1525,7 @@ func acquireShard(
StatesByCluster: make(map[string][]*history.ProcessingQueueState),
}
}
timerProcessingQueueStates, err := serializer.DeserializeProcessingQueueStates(shardInfo.TimerProcessingQueueStates)
timerProcessingQueueStates := shardInfo.TimerProcessingQueueStates
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 88fd2c5

Please sign in to comment.