Skip to content

Commit

Permalink
Queue refactor part 4: task serializer (#2490)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 11, 2022
1 parent 131563b commit 3afd57d
Show file tree
Hide file tree
Showing 17 changed files with 236 additions and 278 deletions.
15 changes: 9 additions & 6 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.temporal.io/server/common/metrics"
p "go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/cassandra"
"go.temporal.io/server/common/persistence/serialization"
"go.temporal.io/server/common/persistence/sql"
"go.temporal.io/server/common/quotas"
"go.temporal.io/server/common/resolver"
Expand Down Expand Up @@ -96,6 +97,7 @@ type (
factoryImpl struct {
sync.RWMutex
config *config.Persistence
serializer serialization.Serializer
abstractDataStoreFactory AbstractDataStoreFactory
faultInjection *FaultInjectionDataStoreFactory
metricsClient metrics.Client
Expand Down Expand Up @@ -158,6 +160,7 @@ func NewFactoryImpl(
) *factoryImpl {
factory := &factoryImpl{
config: cfg,
serializer: serialization.NewSerializer(),
abstractDataStoreFactory: abstractDataStoreFactory,
metricsClient: metricsClient,
logger: logger,
Expand All @@ -175,7 +178,7 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
if err != nil {
return nil, err
}
result := p.NewTaskManager(taskStore)
result := p.NewTaskManager(taskStore, f.serializer)
if ds.ratelimit != nil {
result = p.NewTaskPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand All @@ -189,7 +192,7 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
ds := f.datastores[storeTypeShard]
shardStore, err := ds.factory.NewShardStore()
result := p.NewShardManager(shardStore)
result := p.NewShardManager(shardStore, f.serializer)
if err != nil {
return nil, err
}
Expand All @@ -212,7 +215,7 @@ func (f *factoryImpl) NewMetadataManager() (p.MetadataManager, error) {
return nil, err
}

result := p.NewMetadataManagerImpl(store, f.logger, f.clusterName)
result := p.NewMetadataManagerImpl(store, f.serializer, f.logger, f.clusterName)
if ds.ratelimit != nil {
result = p.NewMetadataPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand All @@ -232,7 +235,7 @@ func (f *factoryImpl) NewClusterMetadataManager() (p.ClusterMetadataManager, err
return nil, err
}

result := p.NewClusterMetadataManagerImpl(store, f.clusterName, f.logger)
result := p.NewClusterMetadataManagerImpl(store, f.serializer, f.clusterName, f.logger)
if ds.ratelimit != nil {
result = p.NewClusterMetadataPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand All @@ -250,7 +253,7 @@ func (f *factoryImpl) NewExecutionManager() (p.ExecutionManager, error) {
if err != nil {
return nil, err
}
result := p.NewExecutionManager(store, f.logger, f.config.TransactionSizeLimit)
result := p.NewExecutionManager(store, f.serializer, f.logger, f.config.TransactionSizeLimit)
if ds.ratelimit != nil {
result = p.NewExecutionPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
Expand All @@ -273,7 +276,7 @@ func (f *factoryImpl) NewNamespaceReplicationQueue() (p.NamespaceReplicationQueu
result = p.NewQueuePersistenceMetricsClient(result, f.metricsClient, f.logger)
}

return p.NewNamespaceReplicationQueue(result, f.clusterName, f.metricsClient, f.logger)
return p.NewNamespaceReplicationQueue(result, f.serializer, f.clusterName, f.metricsClient, f.logger)
}

// Close closes this factory
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/clusterMetadataStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ var _ ClusterMetadataManager = (*clusterMetadataManagerImpl)(nil)
//NewClusterMetadataManagerImpl returns new ClusterMetadataManager
func NewClusterMetadataManagerImpl(
persistence ClusterMetadataStore,
serializer serialization.Serializer,
currentClusterName string,
logger log.Logger,
) ClusterMetadataManager {
return &clusterMetadataManagerImpl{
serializer: serialization.NewSerializer(),
serializer: serializer,
persistence: persistence,
currentClusterName: currentClusterName,
logger: logger,
Expand Down
45 changes: 23 additions & 22 deletions common/persistence/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ var _ ExecutionManager = (*executionManagerImpl)(nil)
// NewExecutionManager returns new ExecutionManager
func NewExecutionManager(
persistence ExecutionStore,
serializer serialization.Serializer,
logger log.Logger,
transactionSizeLimit dynamicconfig.IntPropertyFn,
) ExecutionManager {

return &executionManagerImpl{
serializer: serialization.NewSerializer(),
serializer: serializer,
persistence: persistence,
logger: logger,
pagingTokenSerializer: newJSONHistoryTokenSerializer(),
Expand Down Expand Up @@ -427,19 +428,19 @@ func (m *executionManagerImpl) SerializeWorkflowMutation(

var err error

transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer])
transferTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTransfer])
if err != nil {
return nil, err
}
timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer])
timerTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTimer])
if err != nil {
return nil, err
}
replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication])
replicationTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryReplication])
if err != nil {
return nil, err
}
visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility])
visibilityTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryVisibility])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -546,19 +547,19 @@ func (m *executionManagerImpl) SerializeWorkflowSnapshot(

var err error

transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer])
transferTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTransfer])
if err != nil {
return nil, err
}
timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer])
timerTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTimer])
if err != nil {
return nil, err
}
replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication])
replicationTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryReplication])
if err != nil {
return nil, err
}
visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility])
visibilityTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryVisibility])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -701,19 +702,19 @@ func (m *executionManagerImpl) AddTasks(
input *AddTasksRequest,
) error {

transferTasks, err := m.serializer.SerializeTransferTasks(input.Tasks[tasks.CategoryTransfer])
transferTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTransfer])
if err != nil {
return err
}
timerTasks, err := m.serializer.SerializeTimerTasks(input.Tasks[tasks.CategoryTimer])
timerTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryTimer])
if err != nil {
return err
}
replicationTasks, err := m.serializer.SerializeReplicationTasks(input.Tasks[tasks.CategoryReplication])
replicationTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryReplication])
if err != nil {
return err
}
visibilityTasks, err := m.serializer.SerializeVisibilityTasks(input.Tasks[tasks.CategoryVisibility])
visibilityTasks, err := m.serializer.SerializeTasks(input.Tasks[tasks.CategoryVisibility])
if err != nil {
return err
}
Expand Down Expand Up @@ -742,7 +743,7 @@ func (m *executionManagerImpl) GetTransferTask(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeTransferTasks([]commonpb.DataBlob{resp.Task})
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryTransfer, []commonpb.DataBlob{resp.Task})
if err != nil {
return nil, err
}
Expand All @@ -756,7 +757,7 @@ func (m *executionManagerImpl) GetTransferTasks(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeTransferTasks(resp.Tasks)
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryTransfer, resp.Tasks)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -784,7 +785,7 @@ func (m *executionManagerImpl) GetTimerTask(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeTimerTasks([]commonpb.DataBlob{resp.Task})
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryTimer, []commonpb.DataBlob{resp.Task})
if err != nil {
return nil, err
}
Expand All @@ -798,7 +799,7 @@ func (m *executionManagerImpl) GetTimerTasks(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeTimerTasks(resp.Tasks)
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryTimer, resp.Tasks)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -826,7 +827,7 @@ func (m *executionManagerImpl) GetReplicationTask(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeReplicationTasks([]commonpb.DataBlob{resp.Task})
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryReplication, []commonpb.DataBlob{resp.Task})
if err != nil {
return nil, err
}
Expand All @@ -840,7 +841,7 @@ func (m *executionManagerImpl) GetReplicationTasks(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeReplicationTasks(resp.Tasks)
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryReplication, resp.Tasks)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -872,7 +873,7 @@ func (m *executionManagerImpl) GetReplicationTasksFromDLQ(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeReplicationTasks(resp.Tasks)
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryReplication, resp.Tasks)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -900,7 +901,7 @@ func (m *executionManagerImpl) GetVisibilityTask(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeVisibilityTasks([]commonpb.DataBlob{resp.Task})
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryVisibility, []commonpb.DataBlob{resp.Task})
if err != nil {
return nil, err
}
Expand All @@ -914,7 +915,7 @@ func (m *executionManagerImpl) GetVisibilityTasks(
if err != nil {
return nil, err
}
tasks, err := m.serializer.DeserializeVisibilityTasks(resp.Tasks)
tasks, err := m.serializer.DeserializeTasks(tasks.CategoryVisibility, resp.Tasks)
if err != nil {
return nil, err
}
Expand Down
9 changes: 7 additions & 2 deletions common/persistence/metadata_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ type (
var _ MetadataManager = (*metadataManagerImpl)(nil)

//NewMetadataManagerImpl returns new MetadataManager
func NewMetadataManagerImpl(persistence MetadataStore, logger log.Logger, clusterName string) MetadataManager {
func NewMetadataManagerImpl(
persistence MetadataStore,
serializer serialization.Serializer,
logger log.Logger,
clusterName string,
) MetadataManager {
return &metadataManagerImpl{
serializer: serialization.NewSerializer(),
serializer: serializer,
persistence: persistence,
logger: logger,
clusterName: clusterName,
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/namespaceReplicationQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ var _ NamespaceReplicationQueue = (*namespaceReplicationQueueImpl)(nil)
// NewNamespaceReplicationQueue creates a new NamespaceReplicationQueue instance
func NewNamespaceReplicationQueue(
queue Queue,
serializer serialization.Serializer,
clusterName string,
metricsClient metrics.Client,
logger log.Logger,
) (NamespaceReplicationQueue, error) {
serializer := serialization.NewSerializer()

blob, err := serializer.QueueMetadataToBlob(
&persistence.QueueMetadata{
Expand Down
58 changes: 2 additions & 56 deletions common/persistence/serialization/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,26 +102,8 @@ type (
ReplicationTaskToBlob(replicationTask *replicationspb.ReplicationTask, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error)
ReplicationTaskFromBlob(data *commonpb.DataBlob) (*replicationspb.ReplicationTask, error)

SerializeTransferTasks(taskSlice []tasks.Task) (map[tasks.Key]commonpb.DataBlob, error)
DeserializeTransferTasks(blobSlice []commonpb.DataBlob) ([]tasks.Task, error)

SerializeTimerTasks(taskSlice []tasks.Task) (map[tasks.Key]commonpb.DataBlob, error)
DeserializeTimerTasks(blobSlice []commonpb.DataBlob) ([]tasks.Task, error)
SerializeVisibilityTasks(taskSlice []tasks.Task) (map[tasks.Key]commonpb.DataBlob, error)
DeserializeVisibilityTasks(blobSlice []commonpb.DataBlob) ([]tasks.Task, error)
SerializeReplicationTasks(taskSlice []tasks.Task) (map[tasks.Key]commonpb.DataBlob, error)
DeserializeReplicationTasks(blobSlice []commonpb.DataBlob) ([]tasks.Task, error)

// TODO deprecate method below in favor of methods above (tasks)

TransferTaskInfoToBlob(info *persistencespb.TransferTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error)
TransferTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TransferTaskInfo, error)
TimerTaskInfoToBlob(info *persistencespb.TimerTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error)
TimerTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TimerTaskInfo, error)
ReplicationTaskInfoToBlob(info *persistencespb.ReplicationTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error)
ReplicationTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ReplicationTaskInfo, error)
VisibilityTaskInfoToBlob(info *persistencespb.VisibilityTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error)
VisibilityTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.VisibilityTaskInfo, error)
SerializeTasks(taskSlice []tasks.Task) (map[tasks.Key]commonpb.DataBlob, error)
DeserializeTasks(category tasks.Category, blobSlice []commonpb.DataBlob) ([]tasks.Task, error)
}

// SerializationError is an error type for serialization
Expand Down Expand Up @@ -442,42 +424,6 @@ func (t *serializerImpl) TaskQueueInfoFromBlob(data *commonpb.DataBlob) (*persis
return result, proto3DecodeBlob(data, result)
}

func (t *serializerImpl) TransferTaskInfoToBlob(info *persistencespb.TransferTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
}

func (t *serializerImpl) TransferTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TransferTaskInfo, error) {
result := &persistencespb.TransferTaskInfo{}
return result, proto3DecodeBlob(data, result)
}

func (t *serializerImpl) TimerTaskInfoToBlob(info *persistencespb.TimerTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
}

func (t *serializerImpl) TimerTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.TimerTaskInfo, error) {
result := &persistencespb.TimerTaskInfo{}
return result, proto3DecodeBlob(data, result)
}

func (t *serializerImpl) ReplicationTaskInfoToBlob(info *persistencespb.ReplicationTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
}

func (t *serializerImpl) ReplicationTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.ReplicationTaskInfo, error) {
result := &persistencespb.ReplicationTaskInfo{}
return result, proto3DecodeBlob(data, result)
}

func (t *serializerImpl) VisibilityTaskInfoToBlob(info *persistencespb.VisibilityTaskInfo, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
return proto3EncodeBlob(info, encodingType)
}

func (t *serializerImpl) VisibilityTaskInfoFromBlob(data *commonpb.DataBlob) (*persistencespb.VisibilityTaskInfo, error) {
result := &persistencespb.VisibilityTaskInfo{}
return result, proto3DecodeBlob(data, result)
}

func (t *serializerImpl) ChecksumToBlob(checksum *persistencespb.Checksum, encodingType enumspb.EncodingType) (*commonpb.DataBlob, error) {
// nil is replaced with empty object because it is not supported for "checksum" field in DB.
if checksum == nil {
Expand Down
Loading

0 comments on commit 3afd57d

Please sign in to comment.