From 88fd2c55b362d716bab1c4325e5916f59dee28cf Mon Sep 17 00:00:00 2001 From: Anish Adhikari Date: Fri, 30 Oct 2020 13:07:42 -0700 Subject: [PATCH] Move serialization of datablobs to manager for shardManager (#3709) Co-authored-by: Andrew Dawson --- common/persistence/dataInterfaces.go | 39 +++++----- .../persistence-tests/executionManagerTest.go | 40 +++-------- .../persistence-tests/persistenceTestBase.go | 12 +--- common/persistence/shardManager.go | 72 +++++++++++++++---- service/history/shard/context.go | 30 ++------ 5 files changed, 93 insertions(+), 100 deletions(-) diff --git a/common/persistence/dataInterfaces.go b/common/persistence/dataInterfaces.go index f4d42a4e38f..30cd76e2022 100644 --- a/common/persistence/dataInterfaces.go +++ b/common/persistence/dataInterfaces.go @@ -26,6 +26,9 @@ import ( "strings" "time" + "github.com/uber/cadence/.gen/go/history" + "github.com/uber/cadence/.gen/go/replicator" + "github.com/pborman/uuid" workflow "github.com/uber/cadence/.gen/go/shared" @@ -243,24 +246,24 @@ type ( // ShardInfo describes a shard ShardInfo struct { - ShardID int `json:"shard_id"` - Owner string `json:"owner"` - RangeID int64 `json:"range_id"` - StolenSinceRenew int `json:"stolen_since_renew"` - UpdatedAt time.Time `json:"updated_at"` - ReplicationAckLevel int64 `json:"replication_ack_level"` - ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"` - TransferAckLevel int64 `json:"transfer_ack_level"` - TimerAckLevel time.Time `json:"timer_ack_level"` - ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"` - ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"` - TransferProcessingQueueStates *DataBlob `json:"transfer_processing_queue_states"` - TimerProcessingQueueStates *DataBlob `json:"timer_processing_queue_states"` - TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel - TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel - ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"` - DomainNotificationVersion int64 `json:"domain_notification_version"` - PendingFailoverMarkers *DataBlob `json:"pending_failover_markers"` + ShardID int `json:"shard_id"` + Owner string `json:"owner"` + RangeID int64 `json:"range_id"` + StolenSinceRenew int `json:"stolen_since_renew"` + UpdatedAt time.Time `json:"updated_at"` + ReplicationAckLevel int64 `json:"replication_ack_level"` + ReplicationDLQAckLevel map[string]int64 `json:"replication_dlq_ack_level"` + TransferAckLevel int64 `json:"transfer_ack_level"` + TimerAckLevel time.Time `json:"timer_ack_level"` + ClusterTransferAckLevel map[string]int64 `json:"cluster_transfer_ack_level"` + ClusterTimerAckLevel map[string]time.Time `json:"cluster_timer_ack_level"` + TransferProcessingQueueStates *history.ProcessingQueueStates `json:"transfer_processing_queue_states"` + TimerProcessingQueueStates *history.ProcessingQueueStates `json:"timer_processing_queue_states"` + TransferFailoverLevels map[string]TransferFailoverLevel // uuid -> TransferFailoverLevel + TimerFailoverLevels map[string]TimerFailoverLevel // uuid -> TimerFailoverLevel + ClusterReplicationLevel map[string]int64 `json:"cluster_replication_level"` + DomainNotificationVersion int64 `json:"domain_notification_version"` + PendingFailoverMarkers []*replicator.FailoverMarkerAttributes `json:"pending_failover_markers"` } // TransferFailoverLevel contains corresponding start / end level diff --git a/common/persistence/persistence-tests/executionManagerTest.go b/common/persistence/persistence-tests/executionManagerTest.go index 9b273c15cca..46a4e021302 100644 --- a/common/persistence/persistence-tests/executionManagerTest.go +++ b/common/persistence/persistence-tests/executionManagerTest.go @@ -4935,16 +4935,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { domainNotificationVersion := int64(8192) transferPQS := createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck, cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck) - transferPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates( - &transferPQS, - common.EncodingTypeThriftRW, - ) timerPQS := createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck, cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck) - timerPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates( - &timerPQS, - common.EncodingTypeThriftRW, - ) shardInfo := &p.ShardInfo{ ShardID: shardID, Owner: "some random owner", @@ -4962,8 +4954,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { cluster.TestCurrentClusterName: currentClusterTimerAck, cluster.TestAlternativeClusterName: alternativeClusterTimerAck, }, - TransferProcessingQueueStates: transferPQSBlob, - TimerProcessingQueueStates: timerPQSBlob, + TransferProcessingQueueStates: &transferPQS, + TimerProcessingQueueStates: &timerPQS, DomainNotificationVersion: domainNotificationVersion, ClusterReplicationLevel: map[string]int64{}, ReplicationDLQAckLevel: map[string]int64{}, @@ -4980,12 +4972,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano()) s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates) s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates) - deserializedTransferPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates) - s.Nil(err) - s.Equal(&transferPQS, deserializedTransferPQS) - deserializedTimerPQS, err := s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates) - s.Nil(err) - s.Equal(&timerPQS, deserializedTimerPQS) + s.Equal(&transferPQS, resp.ShardInfo.TransferProcessingQueueStates) + s.Equal(&timerPQS, resp.ShardInfo.TimerProcessingQueueStates) resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt @@ -5003,16 +4991,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { domainNotificationVersion = int64(16384) transferPQS = createTransferPQS(cluster.TestCurrentClusterName, 0, currentClusterTransferAck, cluster.TestAlternativeClusterName, 1, alternativeClusterTransferAck) - transferPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates( - &transferPQS, - common.EncodingTypeThriftRW, - ) timerPQS = createTimerPQS(cluster.TestCurrentClusterName, 0, currentClusterTimerAck, cluster.TestAlternativeClusterName, 1, alternativeClusterTimerAck) - timerPQSBlob, _ = s.PayloadSerializer.SerializeProcessingQueueStates( - &timerPQS, - common.EncodingTypeThriftRW, - ) shardInfo = &p.ShardInfo{ ShardID: shardID, Owner: "some random owner", @@ -5030,8 +5010,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { cluster.TestCurrentClusterName: currentClusterTimerAck, cluster.TestAlternativeClusterName: alternativeClusterTimerAck, }, - TransferProcessingQueueStates: transferPQSBlob, - TimerProcessingQueueStates: timerPQSBlob, + TransferProcessingQueueStates: &transferPQS, + TimerProcessingQueueStates: &timerPQS, DomainNotificationVersion: domainNotificationVersion, ClusterReplicationLevel: map[string]int64{cluster.TestAlternativeClusterName: 12345}, ReplicationDLQAckLevel: map[string]int64{}, @@ -5050,12 +5030,8 @@ func (s *ExecutionManagerSuite) TestCreateGetUpdateGetShard() { s.Equal(shardInfo.TimerAckLevel.UnixNano(), resp.ShardInfo.TimerAckLevel.UnixNano()) s.Equal(shardInfo.TransferProcessingQueueStates, resp.ShardInfo.TransferProcessingQueueStates) s.Equal(shardInfo.TimerProcessingQueueStates, resp.ShardInfo.TimerProcessingQueueStates) - deserializedTransferPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TransferProcessingQueueStates) - s.Nil(err) - s.Equal(&transferPQS, deserializedTransferPQS) - deserializedTimerPQS, err = s.PayloadSerializer.DeserializeProcessingQueueStates(resp.ShardInfo.TimerProcessingQueueStates) - s.Nil(err) - s.Equal(&timerPQS, deserializedTimerPQS) + s.Equal(&transferPQS, resp.ShardInfo.TransferProcessingQueueStates) + s.Equal(&timerPQS, resp.ShardInfo.TimerProcessingQueueStates) resp.ShardInfo.UpdatedAt = shardInfo.UpdatedAt resp.ShardInfo.TimerAckLevel = shardInfo.TimerAckLevel diff --git a/common/persistence/persistence-tests/persistenceTestBase.go b/common/persistence/persistence-tests/persistenceTestBase.go index 5906b8b6005..430f5da2993 100644 --- a/common/persistence/persistence-tests/persistenceTestBase.go +++ b/common/persistence/persistence-tests/persistenceTestBase.go @@ -230,10 +230,6 @@ func (s *TestBase) Setup() { }, } transferPQS := history.ProcessingQueueStates{transferPQSMap} - transferPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates( - &transferPQS, - common.EncodingTypeThriftRW, - ) timerPQSMap := map[string][]*history.ProcessingQueueState{ s.ClusterMetadata.GetCurrentClusterName(): { &history.ProcessingQueueState{ @@ -245,10 +241,6 @@ func (s *TestBase) Setup() { }, } timerPQS := history.ProcessingQueueStates{StatesByCluster: timerPQSMap} - timerPQSBlob, _ := s.PayloadSerializer.SerializeProcessingQueueStates( - &timerPQS, - common.EncodingTypeThriftRW, - ) s.ShardInfo = &p.ShardInfo{ ShardID: shardID, @@ -258,8 +250,8 @@ func (s *TestBase) Setup() { TimerAckLevel: time.Time{}, ClusterTimerAckLevel: map[string]time.Time{clusterName: time.Time{}}, ClusterTransferAckLevel: map[string]int64{clusterName: 0}, - TransferProcessingQueueStates: transferPQSBlob, - TimerProcessingQueueStates: timerPQSBlob, + TransferProcessingQueueStates: &transferPQS, + TimerProcessingQueueStates: &timerPQS, } s.TaskIDGenerator = &TestTransferTaskIDGenerator{} diff --git a/common/persistence/shardManager.go b/common/persistence/shardManager.go index b01b4601abd..f56b8f0e75d 100644 --- a/common/persistence/shardManager.go +++ b/common/persistence/shardManager.go @@ -24,11 +24,14 @@ package persistence import ( "context" + + "github.com/uber/cadence/common" ) type ( shardManager struct { persistence ShardStore + serializer PayloadSerializer } ) @@ -40,6 +43,7 @@ func NewShardManager( ) ShardManager { return &shardManager{ persistence: persistence, + serializer: NewPayloadSerializer(), } } @@ -52,8 +56,12 @@ func (m *shardManager) Close() { } func (m *shardManager) CreateShard(ctx context.Context, request *CreateShardRequest) error { + shardInfo, err := m.toInternalShardInfo(request.ShardInfo) + if err != nil { + return err + } internalRequest := &InternalCreateShardRequest{ - ShardInfo: m.toInternalShardInfo(request.ShardInfo), + ShardInfo: shardInfo, } return m.persistence.CreateShard(ctx, internalRequest) } @@ -66,24 +74,45 @@ func (m *shardManager) GetShard(ctx context.Context, request *GetShardRequest) ( if err != nil { return nil, err } + shardInfo, err := m.fromInternalShardInfo(internalResult.ShardInfo) + if err != nil { + return nil, err + } result := &GetShardResponse{ - ShardInfo: m.fromInternalShardInfo(internalResult.ShardInfo), + ShardInfo: shardInfo, } return result, nil } func (m *shardManager) UpdateShard(ctx context.Context, request *UpdateShardRequest) error { + shardInfo, err := m.toInternalShardInfo(request.ShardInfo) + if err != nil { + return err + } internalRequest := &InternalUpdateShardRequest{ - ShardInfo: m.toInternalShardInfo(request.ShardInfo), + ShardInfo: shardInfo, PreviousRangeID: request.PreviousRangeID, } return m.persistence.UpdateShard(ctx, internalRequest) } -func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardInfo { +func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) (*InternalShardInfo, error) { if shardInfo == nil { - return nil + return nil, nil + } + serializedTransferProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TransferProcessingQueueStates, common.EncodingTypeThriftRW) + if err != nil { + return nil, err + } + serializedTimerProcessingQueueStates, err := m.serializer.SerializeProcessingQueueStates(shardInfo.TimerProcessingQueueStates, common.EncodingTypeThriftRW) + if err != nil { + return nil, err + } + pendingFailoverMarker, err := m.serializer.SerializePendingFailoverMarkers(shardInfo.PendingFailoverMarkers, common.EncodingTypeThriftRW) + if err != nil { + return nil, err } + return &InternalShardInfo{ ShardID: shardInfo.ShardID, Owner: shardInfo.Owner, @@ -96,20 +125,33 @@ func (m *shardManager) toInternalShardInfo(shardInfo *ShardInfo) *InternalShardI TimerAckLevel: shardInfo.TimerAckLevel, ClusterTransferAckLevel: shardInfo.ClusterTransferAckLevel, ClusterTimerAckLevel: shardInfo.ClusterTimerAckLevel, - TransferProcessingQueueStates: shardInfo.TransferProcessingQueueStates, - TimerProcessingQueueStates: shardInfo.TimerProcessingQueueStates, + TransferProcessingQueueStates: serializedTransferProcessingQueueStates, + TimerProcessingQueueStates: serializedTimerProcessingQueueStates, ClusterReplicationLevel: shardInfo.ClusterReplicationLevel, DomainNotificationVersion: shardInfo.DomainNotificationVersion, - PendingFailoverMarkers: shardInfo.PendingFailoverMarkers, + PendingFailoverMarkers: pendingFailoverMarker, TransferFailoverLevels: shardInfo.TransferFailoverLevels, TimerFailoverLevels: shardInfo.TimerFailoverLevels, - } + }, nil } -func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) *ShardInfo { +func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInfo) (*ShardInfo, error) { if internalShardInfo == nil { - return nil + return nil, nil } + transferProcessingQueueStates, err := m.serializer.DeserializeProcessingQueueStates(internalShardInfo.TransferProcessingQueueStates) + if err != nil { + return nil, err + } + timerProcessingQueueStates, err := m.serializer.DeserializeProcessingQueueStates(internalShardInfo.TimerProcessingQueueStates) + if err != nil { + return nil, err + } + pendingFailoverMarker, err := m.serializer.DeserializePendingFailoverMarkers(internalShardInfo.PendingFailoverMarkers) + if err != nil { + return nil, err + } + return &ShardInfo{ ShardID: internalShardInfo.ShardID, Owner: internalShardInfo.Owner, @@ -122,12 +164,12 @@ func (m *shardManager) fromInternalShardInfo(internalShardInfo *InternalShardInf TimerAckLevel: internalShardInfo.TimerAckLevel, ClusterTransferAckLevel: internalShardInfo.ClusterTransferAckLevel, ClusterTimerAckLevel: internalShardInfo.ClusterTimerAckLevel, - TransferProcessingQueueStates: internalShardInfo.TransferProcessingQueueStates, - TimerProcessingQueueStates: internalShardInfo.TimerProcessingQueueStates, + TransferProcessingQueueStates: transferProcessingQueueStates, + TimerProcessingQueueStates: timerProcessingQueueStates, ClusterReplicationLevel: internalShardInfo.ClusterReplicationLevel, DomainNotificationVersion: internalShardInfo.DomainNotificationVersion, - PendingFailoverMarkers: internalShardInfo.PendingFailoverMarkers, + PendingFailoverMarkers: pendingFailoverMarker, TransferFailoverLevels: internalShardInfo.TransferFailoverLevels, TimerFailoverLevels: internalShardInfo.TimerFailoverLevels, - } + }, nil } diff --git a/service/history/shard/context.go b/service/history/shard/context.go index 5a8c51c7aa9..209d09b1183 100644 --- a/service/history/shard/context.go +++ b/service/history/shard/context.go @@ -305,13 +305,7 @@ func (s *contextImpl) UpdateTransferProcessingQueueStates(cluster string, states s.transferProcessingQueueStates.StatesByCluster = make(map[string][]*history.ProcessingQueueState) } s.transferProcessingQueueStates.StatesByCluster[cluster] = states - serializer := s.GetPayloadSerializer() - data, err := serializer.SerializeProcessingQueueStates(s.transferProcessingQueueStates, common.EncodingTypeThriftRW) - if err != nil { - s.logger.Error("Failed to serialize transfer processing queue states", tag.Error(err)) - return err - } - s.shardInfo.TransferProcessingQueueStates = data + s.shardInfo.TransferProcessingQueueStates = s.transferProcessingQueueStates // for backward compatibility ackLevel := states[0].GetAckLevel() @@ -478,13 +472,7 @@ func (s *contextImpl) UpdateTimerProcessingQueueStates(cluster string, states [] s.timerProcessingQueueStates.StatesByCluster = make(map[string][]*history.ProcessingQueueState) } s.timerProcessingQueueStates.StatesByCluster[cluster] = states - serializer := s.GetPayloadSerializer() - data, err := serializer.SerializeProcessingQueueStates(s.timerProcessingQueueStates, common.EncodingTypeThriftRW) - if err != nil { - s.logger.Error("Failed to serialize timer processing queue states", tag.Error(err)) - return err - } - s.shardInfo.TimerProcessingQueueStates = data + s.shardInfo.TimerProcessingQueueStates = s.timerProcessingQueueStates // for backward compatibility ackLevel := states[0].GetAckLevel() @@ -1451,14 +1439,7 @@ func (s *contextImpl) ValidateAndUpdateFailoverMarkers() ([]*replicator.Failover } func (s *contextImpl) updateFailoverMarkersInShardInfoLocked() error { - - serializer := s.GetPayloadSerializer() - data, err := serializer.SerializePendingFailoverMarkers(s.pendingFailoverMarkers, common.EncodingTypeThriftRW) - if err != nil { - return err - } - - s.shardInfo.PendingFailoverMarkers = data + s.shardInfo.PendingFailoverMarkers = s.pendingFailoverMarkers return nil } @@ -1535,8 +1516,7 @@ func acquireShard( timerMaxReadLevelMap[clusterName] = timerMaxReadLevelMap[clusterName].Truncate(time.Millisecond) } - serializer := shardItem.GetPayloadSerializer() - transferProcessingQueueStates, err := serializer.DeserializeProcessingQueueStates(shardInfo.TransferProcessingQueueStates) + transferProcessingQueueStates := shardInfo.TransferProcessingQueueStates if err != nil { return nil, err } @@ -1545,7 +1525,7 @@ func acquireShard( StatesByCluster: make(map[string][]*history.ProcessingQueueState), } } - timerProcessingQueueStates, err := serializer.DeserializeProcessingQueueStates(shardInfo.TimerProcessingQueueStates) + timerProcessingQueueStates := shardInfo.TimerProcessingQueueStates if err != nil { return nil, err }